Implement basic rate limiting for message sending

This commit is contained in:
Niels Andriesse 2021-07-12 16:13:24 +10:00
parent 6daf08e6c8
commit 70a34b7bfb
3 changed files with 42 additions and 1 deletions

2
Cargo.lock generated
View File

@ -1745,7 +1745,7 @@ dependencies = [
[[package]]
name = "session-open-group-server"
version = "0.1.6"
version = "0.1.7"
dependencies = [
"aes-gcm",
"base64",

View File

@ -10,6 +10,7 @@ pub enum Error {
/// The requesting user didn't provide an auth token for a route that requires one.
NoAuthToken,
NoSuchRoom,
RateLimited,
/// The requesting user provided a valid auth token, but they don't have a high enough permission level.
Unauthorized,
ValidationFailed,
@ -23,6 +24,7 @@ pub fn status_code(e: Rejection) -> StatusCode {
Error::DecryptionFailed | Error::InvalidOnionRequest | Error::InvalidRpcCall
| Error::NoSuchRoom | Error::ValidationFailed => return StatusCode::BAD_REQUEST,
Error::NoAuthToken => return StatusCode::UNAUTHORIZED,
Error::RateLimited => return StatusCode::TOO_MANY_REQUESTS,
Error::Unauthorized => return StatusCode::FORBIDDEN,
Error::DatabaseFailedInternally => return StatusCode::INTERNAL_SERVER_ERROR
};

View File

@ -474,6 +474,18 @@ pub fn insert_message(
// Get a connection and open a transaction
let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
// Check if the requesting user needs to be rate limited
let last_5_messages = get_last_5_messages(&requesting_public_key, pool)?;
let should_rate_limit: bool;
if last_5_messages.len() == 5 {
let interval = last_5_messages[0].timestamp - last_5_messages[4].timestamp;
should_rate_limit = interval < 8 * 1000;
} else {
should_rate_limit = false;
}
if should_rate_limit {
return Err(warp::reject::custom(Error::RateLimited));
}
// Insert the message
let timestamp = chrono::Utc::now().timestamp_millis();
message.timestamp = timestamp;
@ -506,6 +518,33 @@ pub fn insert_message(
return Ok(warp::reply::json(&response).into_response());
}
fn get_last_5_messages(
public_key: &str, pool: &storage::DatabaseConnectionPool,
) -> Result<Vec<models::Message>, Rejection> {
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let raw_query = format!(
"SELECT id, public_key, timestamp, data, signature FROM {} WHERE public_key = (?1) ORDER BY timestamp DESC LIMIT 5",
storage::MESSAGES_TABLE
);
let mut query = conn.prepare(&raw_query).map_err(|_| Error::DatabaseFailedInternally)?;
let rows = match query.query_map(params![public_key], |row| {
Ok(models::Message {
server_id: row.get(0)?,
public_key: row.get(1)?,
timestamp: row.get(2)?,
data: row.get(3)?,
signature: row.get(4)?,
})
}) {
Ok(rows) => rows,
Err(e) => {
error!("Couldn't get last 5 messages due to error: {}.", e);
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
return Ok(rows.filter_map(|result| result.ok()).collect());
}
/// Returns either the last `limit` messages or all messages since `from_server_id, limited to `limit`.
pub fn get_messages(
query_params: HashMap<String, String>, auth_token: &str, pool: &storage::DatabaseConnectionPool,