Merge pull request #32 from oxen-io/dev

Add a new endpoint for batch subscribing groups and opt out for 1-1 PNs
This commit is contained in:
RyanZhao 2023-07-24 13:19:34 +10:00 committed by GitHub
commit 39c230f6a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 129 additions and 14 deletions

View File

@ -9,6 +9,7 @@
{
"token": String
"pubKey": String
"device": Optional(String ("ios"/"android"/"huawei"))
}
```
@ -22,6 +23,19 @@
}
```
- Endpoint: `/register_legacy_groups_only`
- Method: **POST**
- Header: `[Content-Type: application/json]`
- Expected Body:
```
{
"token": String
"pubKey": String
"device": Optional(String ("ios"/"android"/"huawei"))
"legacyGroupPublicKeys": [String]
}
```
- Endpoint: `/subscribe_closed_group`
- Method: **POST**
- Header: `[Content-Type: application/json]`

View File

@ -25,6 +25,7 @@ class HTTP:
class SubscriptionRequest:
CLOSED_GROUP = 'closedGroupPublicKey'
PUBKEY = 'pubKey'
CLOSED_GROUPS = 'legacyGroupPublicKeys'
class StatsDataRequest:
START_DATE = 'start_date'

View File

@ -8,9 +8,10 @@ class Device:
PUBKEY = 'pubKey'
TOKEN = 'token'
DEVICE_TYPE = 'device'
LEGACY_GROUPS_ONLY = 'legacy_groups_only'
TABLE = 'token_pubkey_table'
COLUMNS = [Column.PUBKEY, Column.TOKEN, Column.DEVICE_TYPE]
COLUMNS = [Column.PUBKEY, Column.TOKEN, Column.DEVICE_TYPE, Column.LEGACY_GROUPS_ONLY]
CREATE_TABLE = (
f'CREATE TABLE IF NOT EXISTS {TABLE} ('
f' {Column.PUBKEY} TEXT NOT NULL,'
@ -21,6 +22,10 @@ class Device:
f'ALTER TABLE {TABLE}'
f' ADD {Column.DEVICE_TYPE} TEXT'
)
INSERT_FLAG = (
f'ALTER TABLE {TABLE}'
f' ADD {Column.LEGACY_GROUPS_ONLY} BOOL DEFAULT 0 NOT NULL '
)
class Token:
def __init__(self, value, device_type):
@ -38,9 +43,10 @@ class Device:
def __hash__(self):
return str(self.value).__hash__()
def __init__(self, session_id=None):
def __init__(self, session_id=None, legacy_groups_only=False):
self.session_id = session_id
self.tokens = set()
self.legacy_groups_only = legacy_groups_only
self.needs_to_be_updated = False
def to_database_rows(self):
@ -52,7 +58,7 @@ class Device:
if isinstance(token.device_type, str):
LokiLogger().logger.error("Device_Type is String.")
continue
rows.append((self.session_id, token.value, token.device_type.value))
rows.append((self.session_id, token.value, token.device_type.value, self.legacy_groups_only))
self.needs_to_be_updated = False
return rows
@ -70,6 +76,11 @@ class Device:
self.tokens.remove(token_type)
self.needs_to_be_updated = True
def update_legacy_groups_only(self, legacy_groups_only):
if self.legacy_groups_only != legacy_groups_only:
self.legacy_groups_only = legacy_groups_only
self.needs_to_be_updated = True
def save_to_cache(self, db_helper):
db_helper.device_cache[self.session_id] = self
for token in self.tokens:

View File

@ -78,6 +78,29 @@ def unregister(args):
raise Exception(HTTP.Response.PARA_MISSING)
def register_legacy_groups_only(args):
device_token = None
session_id = None
closed_group_ids = []
if HTTP.RegistrationRequest.TOKEN in args:
device_token = args[HTTP.RegistrationRequest.TOKEN]
if HTTP.RegistrationRequest.PUBKEY in args:
session_id = args[HTTP.RegistrationRequest.PUBKEY]
if HTTP.RegistrationRequest.DEVICE_TYPE in args:
device_type = DeviceType(args[HTTP.RegistrationRequest.DEVICE_TYPE])
else:
device_type = DeviceType.iOS if is_ios_device_token(device_token) else DeviceType.Android
if HTTP.SubscriptionRequest.CLOSED_GROUPS in args:
closed_group_ids = args[HTTP.SubscriptionRequest.CLOSED_GROUPS]
if device_token and session_id:
PushNotificationHelperV2().register_legacy_groups_only(device_token, session_id, device_type, closed_group_ids)
return 1, HTTP.Response.SUCCESS
else:
LokiLogger().logger.info("Onion routing register closed groups only error")
raise Exception(HTTP.Response.PARA_MISSING)
def subscribe_closed_group(args):
closed_group_id = None
session_id = None
@ -130,6 +153,7 @@ def notify(args):
Routing = {'register': register_v2,
'unregister': unregister,
'register_legacy_groups_only': register_legacy_groups_only,
'subscribe_closed_group': subscribe_closed_group,
'unsubscribe_closed_group': unsubscribe_closed_group,
'notify': notify}

View File

@ -16,11 +16,12 @@ class SyncDatabaseTask(BaseTask):
async def task(self):
while self.is_running:
try:
for i in range(3 * 60):
await asyncio.sleep(1)
# Check should back up database every second
self.back_up_data_if_needed()
self.create_new_stats_data_entry_if_needed()
for _ in range(3):
for __ in range(60):
await asyncio.sleep(1)
# Check should back up database every second
self.back_up_data_if_needed()
self.create_new_stats_data_entry_if_needed()
# Flush cache to database when the last flush is done
self.database_helper.flush_async()
# Update stats data every 3 minutes

View File

@ -6,4 +6,5 @@ TEST_SESSION_ID_1 = 'test_session_id_1'
TEST_TOKEN_0 = 'test_token_0'
TEST_TOKEN_1 = 'test_token_1'
TEST_CLOSED_GROUP_ID = 'test_closed_group_id'
TEST_CLOSED_GROUP_ID_2 = 'test_closed_group_id_2'
TEST_DATA = 'test_data_bla_bla...'

View File

@ -28,6 +28,7 @@ class DatabaseHelperV2Tests(unittest.TestCase):
test_device = Device()
test_device.session_id = TEST_SESSION_ID
test_device.add_token(Device.Token(TEST_TOKEN_0, DeviceType.Unknown))
test_device.legacy_groups_only = True
test_device.save_to_cache(self.databaseHelper)
test_device_in_cache = self.databaseHelper.get_device(TEST_SESSION_ID)

View File

@ -10,7 +10,8 @@ tests_cases = ['register',
'subscribe_closed_group',
'unsubscribe_closed_group',
'send_push_notification',
'handle_push_fail']
'handle_push_fail',
'register_legacy_groups_only']
class PushNotificationHandlerTests(unittest.TestCase):
@ -100,6 +101,32 @@ class PushNotificationHandlerTests(unittest.TestCase):
test_device_in_cache = self.database_helper.get_device(TEST_SESSION_ID)
self.assertFalse(Device.Token(TEST_TOKEN_0, TEST_DEVICE_TYPE) in test_device_in_cache.tokens)
def test_6_register_legacy_groups_only(self):
self.PN_helper_v2.register(TEST_TOKEN_0, TEST_SESSION_ID, TEST_DEVICE_TYPE)
test_device_in_cache = self.database_helper.get_device(TEST_SESSION_ID)
self.assertFalse(test_device_in_cache.legacy_groups_only)
self.PN_helper_v2.register_legacy_groups_only(TEST_TOKEN_0, TEST_SESSION_ID, TEST_DEVICE_TYPE, [TEST_CLOSED_GROUP_ID])
test_device_in_cache = self.database_helper.get_device(TEST_SESSION_ID)
self.assertTrue(test_device_in_cache.legacy_groups_only)
test_message = {'send_to': TEST_SESSION_ID,
'data': TEST_DATA}
self.PN_helper_v2.add_message_to_queue(test_message)
loop = asyncio.get_event_loop()
coroutine = self.PN_helper_v2.send_push_notification()
loop.run_until_complete(coroutine)
self.assertEqual(self.PN_helper_v2.stats_data.notification_counter_android, 0)
test_closed_group_message = {'send_to': TEST_CLOSED_GROUP_ID,
'data': TEST_DATA}
self.PN_helper_v2.add_message_to_queue(test_closed_group_message)
loop = asyncio.get_event_loop()
coroutine = self.PN_helper_v2.send_push_notification()
loop.run_until_complete(coroutine)
self.assertEqual(self.PN_helper_v2.stats_data.closed_group_messages, 1)
self.assertEqual(self.PN_helper_v2.stats_data.notification_counter_android, 1)
if __name__ == '__main__':
unittest.main()

View File

@ -9,7 +9,8 @@ tests_cases = ['lsrpc',
'notify',
'unregister',
'subscribe_closed_group',
'unsubscribe_closed_group']
'unsubscribe_closed_group',
'register_legacy_groups_only']
class ServerTests(unittest.TestCase):
@ -71,6 +72,16 @@ class ServerTests(unittest.TestCase):
test_closed_group_in_cache = self.database_helper.closed_group_cache.get(TEST_CLOSED_GROUP_ID)
self.assertFalse(TEST_SESSION_ID in test_closed_group_in_cache.members)
def test_7_register_legacy_groups_only(self):
args = {HTTP.RegistrationRequest.TOKEN: TEST_TOKEN_0,
HTTP.RegistrationRequest.PUBKEY: TEST_SESSION_ID,
HTTP.RegistrationRequest.DEVICE_TYPE: TEST_DEVICE_TYPE,
HTTP.SubscriptionRequest.CLOSED_GROUPS: f'[{TEST_CLOSED_GROUP_ID}, {TEST_CLOSED_GROUP_ID_2}]'}
register_legacy_groups_only(args)
self.assertEqual(len(self.database_helper.closed_group_cache), 2)
test_device_in_cache = self.database_helper.device_cache.get(TEST_SESSION_ID)
self.assertTrue(test_device_in_cache.legacy_groups_only)
if __name__ == '__main__':
unittest.main()

View File

@ -19,7 +19,7 @@ class DatabaseHelperV2(metaclass=Singleton):
self.token_device_mapping = {} # {token: Device}
self.closed_group_cache = {} # {closed_group_id: ClosedGroup}
self.create_tables_if_needed()
self.migration_device_type()
self.migration()
self.populate_cache()
# Database backup
@ -66,6 +66,21 @@ class DatabaseHelperV2(metaclass=Singleton):
cursor.close()
db_connection.close()
def migration_flag(self):
db_connection = sqlite3.connect(self.database)
cursor = db_connection.cursor()
try:
cursor.execute(Device.INSERT_FLAG)
db_connection.commit()
except Exception as e:
self.logger.error(e)
cursor.close()
db_connection.close()
def migration(self):
self.migration_device_type()
self.migration_flag()
def populate_cache(self):
db_connection = sqlite3.connect(self.database)
cursor = db_connection.cursor()
@ -78,7 +93,8 @@ class DatabaseHelperV2(metaclass=Singleton):
session_id = row[0]
device_type = DeviceType(row[2]) if row[2] is not None else None
token = Device.Token(row[1], device_type)
device = self.get_device(session_id) or Device(session_id)
legacy_groups_only = bool(row[3]) if row[3] is not None else False
device = self.get_device(session_id) or Device(session_id, legacy_groups_only)
device.tokens.add(token) # Won't trigger needs_to_be_updated
self.device_cache[session_id] = device
self.token_device_mapping[token.value] = device

View File

@ -48,12 +48,13 @@ class PushNotificationHelperV2(metaclass=Singleton):
return device.session_id
return None
def register(self, device_token, session_id, device_type):
def register(self, device_token, session_id, device_type, legacy_groups_only=False):
self.latest_activity_timestamp[session_id] = time.time()
if device_token in self.database_helper.token_device_mapping.keys():
device = self.database_helper.token_device_mapping[device_token]
if device.session_id == session_id:
device.update_legacy_groups_only(legacy_groups_only)
return
else:
self.remove_device_token(device_token)
@ -65,6 +66,8 @@ class PushNotificationHelperV2(metaclass=Singleton):
device = Device()
device.session_id = session_id
device.update_legacy_groups_only(legacy_groups_only)
# When an existed session id adds a new device
device.add_token(Device.Token(device_token, device_type))
device.save_to_cache(self.database_helper)
@ -74,6 +77,11 @@ class PushNotificationHelperV2(metaclass=Singleton):
session_id = self.remove_device_token(device_token)
return session_id
def register_legacy_groups_only(self, device_token, session_id, device_type, closed_group_ids):
self.register(device_token, session_id, device_type, True)
for closed_group_id in closed_group_ids:
self.subscribe_closed_group(closed_group_id, session_id)
def subscribe_closed_group(self, closed_group_id, session_id):
self.latest_activity_timestamp[session_id] = time.time()
@ -171,7 +179,7 @@ class PushNotificationHelperV2(metaclass=Singleton):
recipient = message[HTTP.NotificationRequest.SEND_TO]
device = self.database_helper.get_device(recipient)
closed_group = self.database_helper.get_closed_group(recipient)
if device:
if device and not device.legacy_groups_only:
self.stats_data.increment_deduplicated_one_on_one_message(1)
generate_notifications([recipient])
elif closed_group: