add active session stats

* make a table for user activity so we can calculate the number of active sessions
* every time a public key fetches messages mark it as an active session
* expose an api endpoint to fetch the number of active sessions from now to a given number of seconds ago
* make sure user tables are updated when we run migrations
* give posts posted in the last time slice along with active users
This commit is contained in:
Jeff Becker 2021-05-28 08:49:02 -04:00
parent bc328fff4c
commit d73e2cbb48
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
4 changed files with 120 additions and 1 deletions

View File

@ -556,7 +556,36 @@ pub fn get_messages(
return Err(warp::reject::custom(Error::DatabaseFailedInternally));
}
};
let messages: Vec<models::Message> = rows.filter_map(|result| result.ok()).collect();
// record activity sample for statistics
// let it fail as that isn't a big deal
{
let pubkey = match get_public_key_for_auth_token(auth_token, pool) {
Ok(pubkey) => pubkey,
Err(_) => None,
};
match pubkey {
Some(pubkey) => {
let mut conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let now = chrono::Utc::now().timestamp();
let raw_stats_stmt = format!(
"INSERT OR REPLACE INTO {}(public_key, last_active) VALUES(?1, ?2)",
storage::USER_ACTIVITY_TABLE
);
let tx = conn.transaction().map_err(|_| Error::DatabaseFailedInternally)?;
match tx.execute(&raw_stats_stmt, params![pubkey, now]) {
Ok(_) => (),
Err(e) => {
println!("failed to update stats: {}.", e);
}
};
// Commit
tx.commit().map_err(|_| Error::DatabaseFailedInternally)?;
}
None => {}
}
}
// Return the messages
return Ok(messages);
}
@ -1031,6 +1060,64 @@ pub async fn get_session_version(platform: &str) -> Result<String, Rejection> {
return Ok(tag.clone());
}
// not publicly exposed.
pub async fn get_stats_for_room(
room: String, query_map: HashMap<String, i64>,
) -> Result<Response, Rejection> {
let now = chrono::Utc::now().timestamp();
let window = match query_map.get("window") {
Some(val) => val,
None => &3600i64,
};
let upperbound = match query_map.get("start") {
Some(val) => val,
None => &now,
};
let lowerbound = upperbound - window;
let pool = storage::pool_by_room_id(&room);
let conn = pool.get().map_err(|_| Error::DatabaseFailedInternally)?;
let raw_query_users = format!(
"SELECT COUNT(public_key) FROM {} WHERE last_active > ?1 AND last_active <= ?2",
storage::USER_ACTIVITY_TABLE
);
let mut query_users =
conn.prepare(&raw_query_users).map_err(|_| Error::DatabaseFailedInternally)?;
let active = match query_users
.query_row(params![lowerbound, upperbound], |row| Ok(row.get::<_, u32>(0)?))
{
Ok(row) => row,
Err(_e) => return Err(warp::reject::custom(Error::DatabaseFailedInternally)),
};
let raw_query_posts = format!(
"SELECT COUNT(id) FROM {} WHERE timestamp > ?1 AND timestamp <= ?2",
storage::MESSAGES_TABLE
);
let mut query_posts =
conn.prepare(&raw_query_posts).map_err(|_| Error::DatabaseFailedInternally)?;
let posts = match query_posts
.query_row(params![lowerbound * 1000, upperbound * 1000], |row| Ok(row.get::<_, u32>(0)?))
{
Ok(row) => row,
Err(_e) => return Err(warp::reject::custom(Error::DatabaseFailedInternally)),
};
// Return value
#[derive(Debug, Deserialize, Serialize)]
struct Response {
posts: u32,
active_users: u32,
}
let response = Response { active_users: active, posts };
return Ok(warp::reply::json(&response).into_response());
}
// Utilities
fn get_pending_tokens(

View File

@ -82,6 +82,7 @@ async fn main() {
.or(routes::delete_room())
.or(routes::add_moderator())
.or(routes::delete_moderator())
.or(routes::get_room_stats())
.or(routes::get_url());
if opt.tls {
info!("Running on {} with TLS.", addr);

View File

@ -68,6 +68,16 @@ pub fn get_url() -> impl Filter<Extract = impl warp::Reply, Error = Rejection> +
return warp::get().and(warp::path("url")).and_then(handlers::get_url);
}
/// GET /stats/:id?window=:seconds
///
/// not publicly exposed
pub fn get_room_stats() -> impl Filter<Extract = impl warp::Reply, Error = Rejection> + Clone {
return warp::get()
.and(warp::path!("stats" / String))
.and(warp::filters::query::query())
.and_then(handlers::get_stats_for_room);
}
pub async fn root_html() -> Result<Response, Rejection> {
let body = r#"
<html>

View File

@ -57,6 +57,8 @@ pub const PENDING_TOKENS_TABLE: &str = "pending_tokens";
pub const TOKENS_TABLE: &str = "tokens";
pub const FILES_TABLE: &str = "files";
pub const USER_ACTIVITY_TABLE: &str = "user_activity";
lazy_static::lazy_static! {
static ref POOLS: Mutex<HashMap<String, DatabaseConnectionPool>> = Mutex::new(HashMap::new());
@ -156,6 +158,15 @@ fn create_room_tables_if_needed(conn: &DatabaseConnection) {
FILES_TABLE
);
conn.execute(&files_table_cmd, params![]).expect("Couldn't create files table.");
let user_activity_table_cmd = format!(
"CREATE TABLE IF NOT EXISTS {} (
public_key TEXT PRIMARY KEY,
last_active INTEGER NOT NULL
)",
USER_ACTIVITY_TABLE,
);
conn.execute(&user_activity_table_cmd, params![]).expect("Couldn't create user activty table.");
}
// Pruning
@ -289,7 +300,17 @@ pub async fn prune_files(file_expiration: i64) {
// Migration
pub fn perform_migration() {
// Do nothing
// ensure all rooms schemas are up to date
let rooms = match get_all_room_ids() {
Ok(ids) => ids,
Err(_e) => {
error!("could not get all room ids");
return;
}
};
for room in rooms {
create_database_if_needed(&room);
}
}
// Utilities