Make it work

This commit is contained in:
Jason Rhinelander 2021-09-25 22:55:48 -03:00
parent 8f7bacb62d
commit 68f6ab50c1
9 changed files with 153 additions and 105 deletions

2
Cargo.lock generated
View File

@ -1809,6 +1809,7 @@ dependencies = [
"curve25519-dalek",
"curve25519-parser",
"ed25519-dalek",
"form_urlencoded",
"futures",
"hex",
"hmac",
@ -1832,7 +1833,6 @@ dependencies = [
"structopt",
"tokio",
"tokio-test",
"url",
"warp",
"x25519-dalek",
]

View File

@ -20,6 +20,7 @@ chrono = "0.4"
curve25519-dalek = "3"
curve25519-parser = "0.2"
ed25519-dalek = "^1.0.1"
form_urlencoded = "1"
futures = "0.3"
hex = "0.4"
hmac = "0.10"
@ -42,7 +43,6 @@ serde_json = "1.0"
sha2 = "0.9"
structopt = "0.3"
tokio = { version = "^1.3", features = ["full"] }
url = "2.2.1"
warp = { version = "0.3", features = ["tls"] }
x25519-dalek = "^1.1"

View File

@ -51,7 +51,7 @@ lazy_static::lazy_static! {
hasher.update(PRIVATE_KEY.to_bytes());
hasher.update(PUBLIC_KEY.as_bytes());
let res = hasher.finalize();
let secret = ed25519_dalek::SecretKey::from_bytes(&res[..]).unwrap();
let secret = ed25519_dalek::SecretKey::from_bytes(&res[0..32]).unwrap();
let public = ed25519_dalek::PublicKey::from(&secret);
ed25519_dalek::Keypair{ secret, public }
};

View File

@ -6,7 +6,7 @@ use std::time::{SystemTime, Duration};
use base64;
use ed25519_dalek::Signer;
use log::{error, info, warn};
use log::{error, info, warn, debug};
use parking_lot::RwLock;
use regex::Regex;
use rusqlite::{params, params_from_iter};
@ -19,7 +19,7 @@ use super::errors::Error;
use super::models::{Room, User, Message};
use super::models;
use super::rpc;
use super::storage;
use super::storage::{self, db_error};
// TODO FIXME: the user/room arguments are rather random in here. Should rearrange them all so
// that room-dependent functions have args (room, user, ...)
@ -152,14 +152,14 @@ pub fn get_room(room: &Room) -> Result<Response, Rejection> {
pub fn get_all_rooms() -> Result<Response, Rejection> {
let rooms = match storage::get_conn()?.prepare_cached("SELECT * from rooms ORDER BY token")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_map(params![], Room::from_row) {
Ok(rows) => rows,
Err(e) => {
error!("Couldn't get rooms: {}.", e);
return Err(Error::DatabaseFailedInternally.into());
}
}.collect::<Result<Vec<Room>, _>>().map_err(|_| Error::DatabaseFailedInternally)?;
}.collect::<Result<Vec<Room>, _>>().map_err(db_error)?;
// Return
let response = json!({ "status_code": StatusCode::OK.as_u16(), "rooms": rooms });
@ -250,7 +250,7 @@ fn store_file_impl<'a>(
let db_filename: Option<String> = filename.map(|f| UPLOAD_FILENAME_BAD.replace_all(f, "_").into());
upload.id = match upload.tx.as_ref().unwrap().prepare_cached(
"INSERT INTO files (room, uploader, size, filename, path) VALUES (?, ?, ?, ?, 'tmp') RETURNING id")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, user.id, bytes, db_filename], |row| row.get(0)) {
Ok(r) => r,
Err(e) => {
@ -267,7 +267,7 @@ fn store_file_impl<'a>(
}
fs_filename = format!("{}/{}_{}", files_dir, upload.id, fs_filename);
if let Err(e) = upload.prepare_cached("UPDATE files SET path = ? WHERE id = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![fs_filename, upload.id]) {
error!("Unable to update stored path to '{}': {}", fs_filename, e);
return Err(Error::DatabaseFailedInternally.into());
@ -378,7 +378,7 @@ pub fn get_file(room: Room, id: i64, user: User) -> Result<Response, Rejection>
require_authorization(&conn, &user, &room, AuthorizationRequired { read: true, ..Default::default() })?;
let row = conn.prepare_cached("SELECT path FROM files WHERE room = ? AND id = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, id], |row| row.get(0));
file_response(row)
}
@ -391,7 +391,7 @@ pub async fn get_room_image(room: Room) -> Result<Response, Rejection> {
let conn = storage::get_conn()?;
let row = conn.prepare_cached("SELECT path FROM rooms JOIN files ON rooms.image = files.id WHERE rooms.id = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id], |row| row.get(0));
file_response(row)
}
@ -406,7 +406,7 @@ pub async fn set_room_image(
let mut upload = store_file_impl(&mut conn, &room, &user, auth, data_b64, filename)?;
if let Err(e) = upload.prepare_cached("UPDATE rooms SET image = ? WHERE id = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![upload.id, room.id]) {
error!("Failed to update room file: {}", e);
return Err(Error::DatabaseFailedInternally.into());
@ -468,9 +468,9 @@ pub fn insert_or_update_user(conn: &rusqlite::Connection, session_id: &str) -> R
INSERT INTO users (session_id) VALUES (?)
ON CONFLICT DO UPDATE SET last_active = ((julianday('now') - 2440587.5)*86400.0)
RETURNING *")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![&session_id], User::from_row)
.map_err(|_| Error::DatabaseFailedInternally)?)
.map_err(db_error)?)
}
@ -540,15 +540,15 @@ pub fn insert_message(
let now_secs = unixtime_f64() - RATE_LIMIT_INTERVAL as f64;
let recent_posts: i64 = tx.prepare_cached("SELECT COUNT(*) FROM messages WHERE room = ? AND user = ? AND posted >= ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, user.id, now_secs], |row| row.get(0))
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
if recent_posts >= RATE_LIMIT_POSTS {
return Err(warp::reject::custom(Error::RateLimited));
}
// Insert the message
let message = match tx.prepare_cached("INSERT INTO messages (room, user, data, signature) VALUES (?, ?, ?, ?) RETURNING *")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, user.id, data, signature], Message::from_row) {
Ok(m) => m,
Err(e) => {
@ -612,7 +612,7 @@ pub fn get_messages(
WHERE data IS NOT NULL {} ORDER BY id {} LIMIT ?2",
if from_server_id.is_some() { "AND id > ?1" } else { "" },
if from_server_id.is_some() { "ASC" } else { "DESC" });
let result = match conn.prepare_cached(&query).map_err(|_| Error::DatabaseFailedInternally)?
let result = match conn.prepare_cached(&query).map_err(db_error)?
.query_map(params![from_server_id, limit], Message::from_row) {
Ok(rows) => rows,
Err(e) => {
@ -636,7 +636,7 @@ pub fn delete_messages(
delete_message(&tx, id, &user, &room)?;
}
tx.commit().map_err(|_| Error::DatabaseFailedInternally)?;
tx.commit().map_err(db_error)?;
let json = models::StatusCode { status_code: StatusCode::OK.as_u16() };
Ok(warp::reply::json(&json).into_response())
@ -651,7 +651,7 @@ pub fn delete_message(
// Check to see if the message to be deleted is owned by someone else: if it is, we require
// moderator access for the deletion.
let mut st = conn.prepare_cached("SELECT COUNT(*) FROM messages WHERE room = ? AND id = ? AND user != ?")
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
match st.query_row(params![room.id, id, user.id], |row| row.get::<_, i64>(0)) {
Ok(count) => { if count > 0 { auth_req.moderator = true; } },
@ -661,7 +661,7 @@ pub fn delete_message(
require_authorization(conn, user, room, auth_req)?;
let mut del_st = conn.prepare_cached("DELETE FROM messages WHERE room = ? AND id = ?")
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
if let Err(e) = del_st.execute(params![room.id, id]) {
error!("Couldn't delete message: {}.", e);
@ -690,7 +690,7 @@ pub fn get_deleted_messages(
"SELECT updated, id FROM messages WHERE room = ?1 AND updated > ?2 AND data IS NULL ORDER BY updated LIMIT ?3"
} else {
"SELECT updated, id FROM messages WHERE room = ?1 AND data IS NULL ORDER BY updated DESC LIMIT ?3"
}).map_err(|_| Error::DatabaseFailedInternally)?;
}).map_err(db_error)?;
let result = match st.query_map(params![from_server_id, limit],
|row| Ok(models::DeletedMessage { updated: row.get(0)?, deleted_message_id: row.get(1)? })
) {
@ -728,7 +728,7 @@ pub fn add_moderator_impl(session_id: &str, admin: bool, room: Room) -> Result<R
let tx = storage::get_transaction(&mut conn)?;
if let Err(e) = tx.prepare_cached("INSERT OR IGNORE INTO users (session_id) VALUES (?)")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![session_id]) {
error!("Failed to insert new user row for {}: {}", session_id, e);
return Err(Error::DatabaseFailedInternally.into());
@ -739,7 +739,7 @@ pub fn add_moderator_impl(session_id: &str, admin: bool, room: Room) -> Result<R
VALUES ((SELECT id FROM users WHERE session_id = ?), ?, TRUE)
ON CONFLICT DO UPDATE SET {mod_column} = TRUE", mod_column=if admin { "admin" } else { "moderator" });
if let Err(e) = tx.prepare_cached(&add_perm_query)
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![session_id, room.id]) {
error!("Failed to insert new permission for {}: {}", session_id, e);
return Err(Error::DatabaseFailedInternally.into());
@ -774,7 +774,7 @@ pub fn delete_moderator_impl(session_id: &str, room: Room) -> Result<Response, R
let mut st = conn.prepare_cached(
"UPDATE user_permission_overrides SET moderator = FALSE, admin = FALSE
WHERE room = ? AND user = (SELECT id FROM users WHERE session_id = ?) AND (moderator OR admin)")
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
match st.execute(params![room.id, &session_id]) {
Err(e) => {
error!("Couldn't remove moderator {} from room {}: {}", session_id, room.token, e);
@ -795,9 +795,9 @@ pub fn get_moderators(conn: &rusqlite::Connection, user: &User, room: &Room) ->
let mut st = conn.prepare_cached(
"SELECT session_id FROM user_permissions WHERE room = ? AND moderator AND visible_mod"
).map_err(|_| Error::DatabaseFailedInternally)?;
).map_err(db_error)?;
let ids: Result<Vec<String>, _> = match st.query_map(params![], |row| row.get(0)) {
let ids: Result<Vec<String>, _> = match st.query_map(params![room.id], |row| row.get(0)) {
Ok(row) => row,
Err(e) => {
error!("Couldn't query database: {}.", e);
@ -827,15 +827,15 @@ pub async fn ban(session_id: &str, delete_all: bool, user: &User, room: &Room) -
let tx = storage::get_transaction(&mut conn)?;
tx.prepare_cached("INSERT OR IGNORE INTO users (session_id) VALUES (?)")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![session_id])
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
let userid: i64;
match tx.prepare_cached(
"SELECT user, moderator, global_moderator FROM user_permissions WHERE room = ? AND session_id = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, session_id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))) {
Ok((uid, is_mod, is_global_mod)) => {
if is_global_mod {
@ -860,7 +860,7 @@ pub async fn ban(session_id: &str, delete_all: bool, user: &User, room: &Room) -
if let Err(e) = tx.prepare_cached(
"INSERT INTO user_permission_overrides (room, user, banned, moderator, admin) VALUES (?, ?, TRUE, FALSE, FALSE)
ON CONFLICT DO UPDATE SET banned = TRUE, moderator = FALSE, admin = FALSE")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![room.id, userid]) {
error!("Failed to insert ban for {} in {}: {}", session_id, room.token, e);
return Err(Error::DatabaseFailedInternally.into());
@ -870,7 +870,7 @@ pub async fn ban(session_id: &str, delete_all: bool, user: &User, room: &Room) -
let mut files_removed = 0;
if delete_all {
posts_removed += match tx.prepare_cached("UPDATE messages SET data = NULL, signature = NULL WHERE room = ? AND user = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![room.id, userid]) {
Ok(count) => count,
Err(e) => {
@ -883,9 +883,9 @@ pub async fn ban(session_id: &str, delete_all: bool, user: &User, room: &Room) -
// retrievable) and set them to be expired (so that the next file pruning will delete them
// from disk).
files_removed = tx.prepare_cached("UPDATE files SET room = NULL, expiry = ? WHERE room = ? AND uploader = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![unixtime_f64(), room.id, userid])
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
}
if let Err(e) = tx.commit() {
@ -912,7 +912,7 @@ pub fn unban(
let count = match conn.prepare_cached(
"UPDATE user_permission_overrides SET banned = FALSE WHERE room = ? AND user IN (SELECT id FROM users WHERE session_id = ?)")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![room.id, session_id]) {
Ok(count) => count,
@ -941,7 +941,7 @@ pub fn get_banned_public_keys(
require_authorization(&conn, user, room, AuthorizationRequired { moderator: true, ..Default::default() })?;
let banned_members: Result<Vec<String>, _> = match conn.prepare_cached("SELECT session_id FROM user_permissions WHERE room = ? AND banned")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_map(params![room.id], |row| row.get(0)) {
Ok(rows) => rows,
Err(e) => {
@ -952,7 +952,7 @@ pub fn get_banned_public_keys(
Ok(warp::reply::json(&json!({
"status_code": StatusCode::OK.as_u16(),
"banned_members": banned_members.map_err(|_| Error::DatabaseFailedInternally)?
"banned_members": banned_members.map_err(db_error)?
})).into_response())
}
@ -970,7 +970,7 @@ pub fn get_member_count_since(user: User, room: Room, ago: Duration) -> Result<R
require_authorization(&conn, &user, &room, AuthorizationRequired { read: true, ..Default::default() })?;
let mut st = conn.prepare_cached("SELECT COUNT(*) FROM room_users WHERE room = ? AND last_active >= ?")
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
let cutoff = (SystemTime::now() - ago).duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs_f64();
let users = match st.query_row(params![room.id, cutoff], |row| Ok(row.get::<_, i64>(0)?)) {
Ok(count) => count,
@ -999,36 +999,41 @@ pub fn compact_poll(
if !rooms.is_empty() {
let query = format!("SELECT * FROM rooms WHERE token IN (?{})", ",?".repeat(rooms.len()-1));
for r in tx.prepare_cached(&query)
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_map(params_from_iter(rooms.keys()), Room::from_row)
.map_err(|_| Error::DatabaseFailedInternally)? {
.map_err(db_error)? {
let room = r.map_err(|_| Error::DatabaseFailedInternally)?;
let room = r.map_err(db_error)?;
rooms.insert(room.token.clone(), Some(room));
}
}
{
// Gets a list of recent room messages: Session fires this when first joining a room.
let mut get_recent_messages = tx.prepare_cached(
"SELECT * FROM message_details WHERE room = ? AND data IS NOT NULL ORDER BY id DESC LIMIT 256")
.map_err(db_error)?;
// Gets a list of new, updated, and deleted messages since a given room update value.
let mut get_msg_updates = tx.prepare_cached(
"SELECT * FROM messages WHERE room = ? AND updated > ? ORDER BY updated LIMIT 256")
.map_err(|_| Error::DatabaseFailedInternally)?;
"SELECT * FROM message_details WHERE room = ? AND updated > ? ORDER BY updated LIMIT 256")
.map_err(db_error)?;
// We need these for deprecated requests; can remove once we know longer support them
let mut get_deleted_msgs = tx.prepare_cached(
"SELECT id, updated FROM messages WHERE room = ? AND updated > ? AND data IS NULL ORDER BY updated LIMIT 256")
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
let mut get_msgs_since = tx.prepare_cached(
"SELECT * FROM messages WHERE room = ? AND id > ? AND data IS NOT NULL ORDER BY id LIMIT 256")
.map_err(|_| Error::DatabaseFailedInternally)?;
"SELECT * FROM message_details WHERE room = ? AND id > ? AND data IS NOT NULL ORDER BY id LIMIT 256")
.map_err(db_error)?;
for request in request_bodies {
let mut response = models::CompactPollResponseBody {
room_token: request.room_token.clone(),
status_code: StatusCode::OK.as_u16(),
messages: vec![],
deletions: None,
deletions: Some(vec![]), // This should really be None, but current Session chokes on that (even when it doesn't ask for deletions!)
moderators: vec![],
};
@ -1061,29 +1066,36 @@ pub fn compact_poll(
// Newer clients just request all messages since an update id, and get everything
// (new+edits+deletions) posted to the room since then.
if let Some(since) = request.since_update {
response.messages = get_msg_updates.query_map(
params![room.id, since], Message::from_row)
.map_err(|_| Error::DatabaseFailedInternally)?
debug!("Got unified poll request for room {} since update {}", room.token, since);
response.messages = get_msg_updates.query_map(params![room.id, since], Message::from_row)
.map_err(db_error)?
.collect::<Result<Vec<models::Message>, _>>()
.map_err(|_| Error::DatabaseFailedInternally)?;
} else {
.map_err(db_error)?;
} else if let Some(since) = request.from_message_server_id {
// Older Session clients ask us for messages since some ID & deletions since some deletion
// ID, and can't handle edits at all:
if let Some(since) = request.since_message {
response.messages = get_msgs_since.query_map(
params![room.id, since], Message::from_row)
.map_err(|_| Error::DatabaseFailedInternally)?
.collect::<Result<Vec<models::Message>, _>>()
.map_err(|_| Error::DatabaseFailedInternally)?;
}
if let Some(since) = request.since_deletion {
response.deletions = Some(get_deleted_msgs.query_map(params![room.id, since],
debug!("Got deprecated poll request for room {} messages since {}", room.token, since);
response.messages = get_msgs_since.query_map(params![room.id, since], Message::from_row)
.map_err(db_error)?
.collect::<Result<Vec<models::Message>, _>>()
.map_err(db_error)?;
} else if let Some(since) = request.from_deletion_server_id {
// Older Session making a deprecated request for deletions since N
debug!("Got deprecated poll request for room {} deletions since {}", room.token, since);
response.deletions = Some(get_deleted_msgs.query_map(
params![room.id, since],
|row| Ok(models::DeletedMessage { deleted_message_id: row.get(0)?, updated: row.get(1)? }))
.map_err(|_| Error::DatabaseFailedInternally)?
.collect::<Result<Vec<models::DeletedMessage>, _>>()
.map_err(|_| Error::DatabaseFailedInternally)?
);
}
.map_err(db_error)?
.collect::<Result<Vec<models::DeletedMessage>, _>>()
.map_err(db_error)?
);
} else {
// No request at all means return all recent messages.
debug!("Got poll request for recent messages for {}", room.token);
response.messages = get_recent_messages.query_map(params![room.id], Message::from_row)
.map_err(db_error)?
.collect::<Result<Vec<models::Message>, _>>()
.map_err(db_error)?;
}
// Get the moderators
@ -1154,15 +1166,15 @@ pub async fn get_stats_for_room(room_token: String, query_map: HashMap<String, i
let active = tx.prepare_cached(
"SELECT COUNT(*) FROM room_users WHERE room = ? AND last_active BETWEEN ? AND ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, lowerbound, upperbound], |row| Ok(row.get::<_, i64>(0)?))
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
let posts = tx.prepare_cached(
"SELECT COUNT(*) FROM messages WHERE room = ? AND posted BETWEEN ? AND ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![room.id, lowerbound, upperbound], |row| Ok(row.get::<_, i64>(0)?))
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
// FIXME: DRY this structure
let response = json!({
@ -1204,7 +1216,7 @@ fn require_authorization_no_activity(conn: &rusqlite::Connection, user: &User, r
fn require_authorization_impl(conn: &rusqlite::Connection, user: &User, room: &Room, need: AuthorizationRequired, log_active: bool) -> Result<(), Error> {
let mut st = conn.prepare_cached(
"SELECT banned, read, write, upload, moderator, admin FROM user_permissions WHERE room = ? AND user = ?"
).map_err(|_| Error::DatabaseFailedInternally)?;
).map_err(db_error)?;
match st.query_row(params![room.id, user.id], |row| {
let banned: bool = row.get(0)?;
@ -1229,9 +1241,9 @@ fn require_authorization_impl(conn: &rusqlite::Connection, user: &User, room: &R
if log_active {
conn.prepare_cached("UPDATE room_users SET last_active = ? WHERE user = ? AND room = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.execute(params![unixtime_f64(), user.id, room.id])
.map_err(|_| Error::DatabaseFailedInternally)?;
.map_err(db_error)?;
}
Ok(())
}

View File

@ -17,7 +17,7 @@ pub fn init(log_file: Option<String>, log_level: Option<String>) {
LevelFilter::Info
};
let stdout_appender = {
let encoder = Box::new(PatternEncoder::new("{h({l})} {d} - {m}{n}"));
let encoder = Box::new(PatternEncoder::new("{h({l})} {d(%Y-%m-%d %H:%M:%S%.3f)} [{f}:{L}] {m}{n}"));
let stdout = ConsoleAppender::builder().encoder(encoder).build();
let filter = Box::new(ThresholdFilter::new(level));
Appender::builder().filter(filter).build("stdout", Box::new(stdout))

View File

@ -45,7 +45,7 @@ pub struct Message {
pub timestamp: i64, // unix epoch milliseconds; Deprecated in favour of `posted`
pub posted: f64, // unix epoch seconds when the message was created
pub edited: Option<f64>, // unix epoch seconds when the message was last edited (null if never edited)
pub update: Option<i64>, // Set to the room's current `updates` value when created or last edited/deleted
pub updated: i64, // Set to the room's current `updates` value when created or last edited/deleted
#[serde(skip_serializing_if = "Option::is_none", serialize_with = "as_opt_base64")]
pub data: Option<Vec<u8>>,
#[serde(skip_serializing_if = "Option::is_none", serialize_with = "as_opt_base64")]
@ -87,7 +87,7 @@ impl Message {
timestamp: (posted * 1000.0) as i64,
posted,
edited: row.get(row.column_index("edited")?)?,
update: row.get(row.column_index("updated")?)?,
updated: row.get(row.column_index("updated")?)?,
data,
signature: row.get(row.column_index("signature")?)?,
deleted
@ -154,15 +154,22 @@ pub struct ChangeModeratorRequestBody {
pub struct CompactPollRequestBody {
#[serde(rename = "room_id")]
pub room_token: String,
// Deprecated: older Session clients pass the authorization token through this. Newer clients
// should use signed requests instead.
pub auth_token: Option<String>,
// New querying ability, returns all new+edited+deleted messages since the given value
// New querying ability: returns all messages (new, updates, and deletions) since the given
// room update counter.
pub since_update: Option<i64>,
// Old querying:
#[serde(rename = "from_deletion_server_id")]
pub since_deletion: Option<i64>,
#[serde(rename = "from_message_server_id")]
pub since_message: Option<i64>,
// Old, deprecated querying. These return separate lists for messages and deletions, and do
// not support message updates at all. Both may be given at once.
pub from_deletion_server_id: Option<i64>,
pub from_message_server_id: Option<i64>,
// If none of the above since/from options are given, we return the most recent 256 messages,
// not including deletion markers, in reverse chronological order (i.e. latest message first).
}
#[derive(Debug, Serialize)]
@ -170,6 +177,7 @@ pub struct CompactPollResponseBody {
#[serde(rename = "room_id")]
pub room_token: String,
pub status_code: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub deletions: Option<Vec<DeletedMessage>>,
pub messages: Vec<Message>,
pub moderators: Vec<String>,

View File

@ -87,20 +87,21 @@ pub async fn handle_rpc_call(mut rpc_call: RpcCall) -> Result<Response, Rejectio
if !rpc_call.endpoint.starts_with('/') {
rpc_call.endpoint = format!("/{}", rpc_call.endpoint);
}
let path: String = match rpc_call.endpoint.parse::<http::Uri>() {
Ok(uri) => uri.path().trim_start_matches('/').to_string(),
let path: String;
let query_params: HashMap<String, String>;
match rpc_call.endpoint.parse::<http::Uri>() {
Ok(uri) => {
path = uri.path().trim_start_matches('/').to_string();
query_params = match uri.query() {
Some(qs) => form_urlencoded::parse(qs.as_bytes()).into_owned().collect(),
None => HashMap::new()
};
}
Err(e) => {
warn!("Couldn't parse URI from '{}': {}.", &rpc_call.endpoint, e);
return Err(Error::InvalidRpcCall.into());
}
};
let query_params: HashMap<String, String> = match url::Url::parse(&rpc_call.endpoint) {
Ok(url) => url.query_pairs().into_owned().collect(),
Err(e) => {
warn!("Couldn't parse URL from '{}': {}.", &rpc_call.endpoint, e);
return Err(Error::InvalidRpcCall.into());
}
};
// TODO FIXME: rather than get the room from a header, here, we should consistently rewrite
// urls to include the room identifier, e.g. POST /r/room123/message (or similar), and handle

View File

@ -47,7 +47,7 @@ CREATE TRIGGER messages_insert_counter AFTER INSERT ON messages
FOR EACH ROW
BEGIN
UPDATE rooms SET updates = updates + 1 WHERE id = NEW.room;
UPDATE messages SET updated = (SELECT updates FROM rooms WHERE id = NEW.room);
UPDATE messages SET updated = (SELECT updates FROM rooms WHERE id = NEW.room) WHERE id = NEW.id;
END;
-- Trigger to record the old value into message_history whenever data is updated, and update the
@ -57,7 +57,7 @@ FOR EACH ROW WHEN NEW.data IS NOT OLD.data
BEGIN
INSERT INTO message_history (message, data, signature) VALUES (NEW.id, OLD.data, OLD.signature);
UPDATE rooms SET updates = updates + 1 WHERE id = NEW.room;
UPDATE messages SET updated = (SELECT updates FROM rooms WHERE id = NEW.room);
UPDATE messages SET updated = (SELECT updates FROM rooms WHERE id = NEW.room) WHERE id = NEW.id;
END;
-- Trigger to remove the room's pinned message when that message is deleted
@ -129,6 +129,13 @@ BEGIN
END;
-- Effectively the same as `messages` except that it also includes the `session_id` from the users
-- table of the user who posted it, which we often need when returning a message list to clients.
CREATE TABLE message_details AS
SELECT messages.*, users.session_id FROM messages JOIN users ON messages.user = users.id;
CREATE TABLE room_users (
room INTEGER NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,

View File

@ -4,7 +4,7 @@ use std::time::{SystemTime, Duration};
use log::{error, warn, info};
use r2d2_sqlite::SqliteConnectionManager;
use regex::Regex;
use rusqlite::params;
use rusqlite::{params, config::DbConfig};
//use rusqlite_migration::{Migrations, M};
use super::errors::Error;
@ -54,21 +54,41 @@ lazy_static::lazy_static! {
static ref DB_POOL: DatabaseConnectionPool = {
let file_name = "sogs.db";
let db_manager = r2d2_sqlite::SqliteConnectionManager::file(file_name)
.with_init(|c| c.execute_batch("
PRAGMA foreign_keys = ON;
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
"));
.with_init(|c| {
c.set_prepared_statement_cache_capacity(100);
c.execute_batch("
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
")?;
if !c.set_db_config(DbConfig::SQLITE_DBCONFIG_ENABLE_FKEY, true)? {
panic!("Unable to enable foreign key support; perhaps sqlite3 is compiled without it‽");
}
if !c.set_db_config(DbConfig::SQLITE_DBCONFIG_ENABLE_TRIGGER, true)? {
panic!("Unable to enable trigger support; perhaps sqlite3 is built without it‽");
}
Ok(())
});
return r2d2::Pool::new(db_manager).unwrap();
};
}
pub fn get_conn() -> Result<DatabaseConnection, Error> {
Ok(DB_POOL.get().map_err(|_| Error::DatabaseFailedInternally)?)
pub fn get_conn() -> Result<DatabaseConnection, errors::Error> {
match DB_POOL.get() {
Ok(conn) => Ok(conn),
Err(e) => {
error!("Unable to get database connection: {}", e);
return Err(errors::Error::DatabaseFailedInternally);
}
}
}
pub fn get_transaction<'a>(conn: &'a mut DatabaseConnection) -> Result<DatabaseTransaction<'a>, Error> {
conn.transaction().map_err(|_| Error::DatabaseFailedInternally)
pub fn get_transaction<'a>(conn: &'a mut DatabaseConnection) -> Result<DatabaseTransaction<'a>, errors::Error> {
conn.transaction().map_err(db_error)
}
pub fn db_error(e: rusqlite::Error) -> errors::Error {
error!("Database query failed: {}", e);
return errors::Error::DatabaseFailedInternally;
}
/// Initialize the database, creating and migrating its structure if necessary.
@ -222,7 +242,7 @@ fn apply_permission_updates(conn: &mut DatabaseConnection, now: &SystemTime) {
pub fn get_room_from_token(conn: &rusqlite::Connection, token: &str) -> Result<Room, Error> {
match conn.prepare_cached("SELECT * FROM rooms WHERE token = ?")
.map_err(|_| Error::DatabaseFailedInternally)?
.map_err(db_error)?
.query_row(params![&token], Room::from_row) {
Ok(room) => return Ok(room),
Err(rusqlite::Error::QueryReturnedNoRows) => return Err(Error::NoSuchRoom.into()),