Merge pull request #1102 from mpretty-cyro/fix/non-blocking-migration-and-extras

Fixed a few bugs, added logging and removed some old code
This commit is contained in:
Morgan Pretty 2023-02-06 16:00:36 +11:00 committed by GitHub
commit 40f315af8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 209 additions and 633 deletions

View File

@ -64,7 +64,6 @@ import org.thoughtcrime.securesms.dependencies.DatabaseComponent;
import org.thoughtcrime.securesms.dependencies.DatabaseModule;
import org.thoughtcrime.securesms.emoji.EmojiSource;
import org.thoughtcrime.securesms.groups.OpenGroupManager;
import org.thoughtcrime.securesms.groups.OpenGroupMigrator;
import org.thoughtcrime.securesms.home.HomeActivity;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
@ -206,9 +205,6 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
storage,
messageDataProvider,
()-> KeyPairUtilities.INSTANCE.getUserED25519KeyPair(this));
// migrate session open group data
OpenGroupMigrator.migrate(getDatabaseComponent());
// end migration
callMessageProcessor = new CallMessageProcessor(this, textSecurePreferences, ProcessLifecycleOwner.get().getLifecycle(), storage);
Log.i(TAG, "onCreate()");
startKovenant();

View File

@ -210,8 +210,7 @@ public class PassphrasePromptActivity extends BaseActionBarActivity {
try {
signature = biometricSecretProvider.getOrCreateBiometricSignature(this);
hasSignatureObject = true;
throw new InvalidKeyException("e");
} catch (InvalidKeyException e) {
} catch (Exception e) {
signature = null;
hasSignatureObject = false;
Log.e(TAG, "Error getting / creating signature", e);

View File

@ -963,6 +963,18 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show()
}
override fun copyOpenGroupUrl(thread: Recipient) {
if (!thread.isOpenGroupRecipient) { return }
val threadId = threadDb.getThreadIdIfExistsFor(thread) ?: return
val openGroup = lokiThreadDb.getOpenGroupChat(threadId) ?: return
val clip = ClipData.newPlainText("Community URL", openGroup.joinURL)
val manager = getSystemService(PassphraseRequiredActionBarActivity.CLIPBOARD_SERVICE) as ClipboardManager
manager.setPrimaryClip(clip)
Toast.makeText(this, R.string.copied_to_clipboard, Toast.LENGTH_SHORT).show()
}
override fun showExpiringMessagesDialog(thread: Recipient) {
if (thread.isClosedGroupRecipient) {
val group = groupDb.getGroup(thread.address.toGroupString()).orNull()

View File

@ -78,6 +78,10 @@ object ConversationMenuHelper {
inflater.inflate(R.menu.menu_conversation_expiration_off, menu)
}
}
// One-on-one chat menu allows copying the session id
if (thread.isContactRecipient) {
inflater.inflate(R.menu.menu_conversation_copy_session_id, menu)
}
// One-on-one chat menu (options that should only be present for one-on-one chats)
if (thread.isContactRecipient) {
if (thread.isBlocked) {
@ -154,6 +158,7 @@ object ConversationMenuHelper {
R.id.menu_block -> { block(context, thread, deleteThread = false) }
R.id.menu_block_delete -> { blockAndDelete(context, thread) }
R.id.menu_copy_session_id -> { copySessionID(context, thread) }
R.id.menu_copy_open_group_url -> { copyOpenGroupUrl(context, thread) }
R.id.menu_edit_group -> { editClosedGroup(context, thread) }
R.id.menu_leave_group -> { leaveClosedGroup(context, thread) }
R.id.menu_invite_to_open_group -> { inviteContacts(context, thread) }
@ -270,6 +275,12 @@ object ConversationMenuHelper {
listener.copySessionID(thread.address.toString())
}
private fun copyOpenGroupUrl(context: Context, thread: Recipient) {
if (!thread.isOpenGroupRecipient) { return }
val listener = context as? ConversationMenuListener ?: return
listener.copyOpenGroupUrl(thread)
}
private fun editClosedGroup(context: Context, thread: Recipient) {
if (!thread.isClosedGroupRecipient) { return }
val intent = Intent(context, EditClosedGroupActivity::class.java)
@ -344,6 +355,7 @@ object ConversationMenuHelper {
fun block(deleteThread: Boolean = false)
fun unblock()
fun copySessionID(sessionId: String)
fun copyOpenGroupUrl(thread: Recipient)
fun showExpiringMessagesDialog(thread: Recipient)
}

View File

@ -57,7 +57,6 @@ import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.MmsMessageRecord;
import org.thoughtcrime.securesms.database.model.ThreadRecord;
import org.thoughtcrime.securesms.dependencies.DatabaseComponent;
import org.thoughtcrime.securesms.groups.OpenGroupMigrator;
import org.thoughtcrime.securesms.mms.Slide;
import org.thoughtcrime.securesms.mms.SlideDeck;
import org.thoughtcrime.securesms.notifications.MarkReadReceiver;
@ -800,77 +799,6 @@ public class ThreadDatabase extends Database {
return query;
}
@NotNull
public List<ThreadRecord> getHttpOxenOpenGroups() {
String where = TABLE_NAME+"."+ADDRESS+" LIKE ?";
String selection = OpenGroupMigrator.HTTP_PREFIX+OpenGroupMigrator.OPEN_GET_SESSION_TRAILING_DOT_ENCODED +"%";
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = createQuery(where, 0);
Cursor cursor = db.rawQuery(query, new String[]{selection});
if (cursor == null) {
return Collections.emptyList();
}
List<ThreadRecord> threads = new ArrayList<>();
try {
Reader reader = readerFor(cursor);
ThreadRecord record;
while ((record = reader.getNext()) != null) {
threads.add(record);
}
} finally {
cursor.close();
}
return threads;
}
@NotNull
public List<ThreadRecord> getLegacyOxenOpenGroups() {
String where = TABLE_NAME+"."+ADDRESS+" LIKE ?";
String selection = OpenGroupMigrator.LEGACY_GROUP_ENCODED_ID+"%";
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = createQuery(where, 0);
Cursor cursor = db.rawQuery(query, new String[]{selection});
if (cursor == null) {
return Collections.emptyList();
}
List<ThreadRecord> threads = new ArrayList<>();
try {
Reader reader = readerFor(cursor);
ThreadRecord record;
while ((record = reader.getNext()) != null) {
threads.add(record);
}
} finally {
cursor.close();
}
return threads;
}
@NotNull
public List<ThreadRecord> getHttpsOxenOpenGroups() {
String where = TABLE_NAME+"."+ADDRESS+" LIKE ?";
String selection = OpenGroupMigrator.NEW_GROUP_ENCODED_ID+"%";
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = createQuery(where, 0);
Cursor cursor = db.rawQuery(query, new String[]{selection});
if (cursor == null) {
return Collections.emptyList();
}
List<ThreadRecord> threads = new ArrayList<>();
try {
Reader reader = readerFor(cursor);
ThreadRecord record;
while ((record = reader.getNext()) != null) {
threads.add(record);
}
} finally {
cursor.close();
}
return threads;
}
public void migrateEncodedGroup(long threadId, @NotNull String newEncodedGroupId) {
ContentValues contentValues = new ContentValues(1);
contentValues.put(ADDRESS, newEncodedGroupId);

View File

@ -97,25 +97,40 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
private final DatabaseSecret databaseSecret;
public SQLCipherOpenHelper(@NonNull Context context, @NonNull DatabaseSecret databaseSecret) {
super(context, DATABASE_NAME, databaseSecret.asString(), null, DATABASE_VERSION, MIN_DATABASE_VERSION, null, new SQLiteDatabaseHook() {
@Override
public void preKey(SQLiteConnection connection) {
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true);
}
@Override
public void postKey(SQLiteConnection connection) {
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true);
// if not vacuumed in a while, perform that operation
long currentTime = System.currentTimeMillis();
// 7 days
if (currentTime - TextSecurePreferences.getLastVacuumTime(context) > 604_800_000) {
connection.execute("VACUUM;", null, null);
TextSecurePreferences.setLastVacuumNow(context);
super(
context,
DATABASE_NAME,
databaseSecret.asString(),
null,
DATABASE_VERSION,
MIN_DATABASE_VERSION,
null,
new SQLiteDatabaseHook() {
@Override
public void preKey(SQLiteConnection connection) {
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true);
}
}
}, true);
@Override
public void postKey(SQLiteConnection connection) {
SQLCipherOpenHelper.applySQLCipherPragmas(connection, true);
// if not vacuumed in a while, perform that operation
long currentTime = System.currentTimeMillis();
// 7 days
if (currentTime - TextSecurePreferences.getLastVacuumTime(context) > 604_800_000) {
connection.execute("VACUUM;", null, null);
TextSecurePreferences.setLastVacuumNow(context);
}
}
},
// Note: Now that we support concurrent database reads the migrations are actually non-blocking
// because of this we need to initially open the database with writeAheadLogging (WAL mode) disabled
// and enable it once the database officially opens it's connection (which will cause it to re-connect
// in WAL mode) - this is a little inefficient but will prevent SQL-related errors/crashes due to
// incomplete migrations
false
);
this.context = context.getApplicationContext();
this.databaseSecret = databaseSecret;
@ -150,11 +165,11 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
// If the old SQLCipher3 database file doesn't exist then no need to do anything
if (!oldDbFile.exists()) { return; }
try {
// Define the location for the new database
String newDbPath = context.getDatabasePath(DATABASE_NAME).getPath();
File newDbFile = new File(newDbPath);
// Define the location for the new database
String newDbPath = context.getDatabasePath(DATABASE_NAME).getPath();
File newDbFile = new File(newDbPath);
try {
// If the new database file already exists then check if it's valid first, if it's in an
// invalid state we should delete it and try to migrate again
if (newDbFile.exists()) {
@ -162,10 +177,24 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
// assume the user hasn't downgraded for some reason and made changes to the old database and
// can remove the old database file (it won't be used anymore)
if (oldDbFile.lastModified() <= newDbFile.lastModified()) {
// TODO: Delete 'CIPHER3_DATABASE_NAME' once enough time has past
// //noinspection ResultOfMethodCallIgnored
// oldDbFile.delete();
return;
try {
SQLiteDatabase newDb = SQLCipherOpenHelper.open(newDbPath, databaseSecret, true);
int version = newDb.getVersion();
newDb.close();
// Make sure the new database has it's version set correctly (if not then the migration didn't
// fully succeed and the database will try to create all it's tables and immediately fail so
// we will need to remove and remigrate)
if (version > 0) {
// TODO: Delete 'CIPHER3_DATABASE_NAME' once enough time has past
// //noinspection ResultOfMethodCallIgnored
// oldDbFile.delete();
return;
}
}
catch (Exception e) {
Log.i(TAG, "Failed to retrieve version from new database, assuming invalid and remigrating");
}
}
// If the old database does have newer changes then the new database could have stale/invalid
@ -207,6 +236,11 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
catch (Exception e) {
Log.e(TAG, "Migration from SQLCipher3 to SQLCipher4 failed", e);
// If an exception was thrown then we should remove the new database file (it's probably invalid)
if (!newDbFile.delete()) {
Log.e(TAG, "Unable to delete invalid new database file");
}
// Notify the user of the issue so they know they can downgrade until the issue is fixed
NotificationManager notificationManager = context.getSystemService(NotificationManager.class);
String channelId = context.getString(R.string.NotificationChannel_failures);
@ -559,6 +593,15 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
}
}
@Override
public void onOpen(SQLiteDatabase db) {
super.onOpen(db);
// Now that the database is officially open (ie. the migrations are completed) we want to enable
// write ahead logging (WAL mode) to officially support concurrent read connections
db.enableWriteAheadLogging();
}
public void markCurrent(SQLiteDatabase db) {
db.setVersion(DATABASE_VERSION);
}

View File

@ -1,139 +0,0 @@
package org.thoughtcrime.securesms.groups
import androidx.annotation.VisibleForTesting
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.utilities.Hex
import org.thoughtcrime.securesms.database.model.ThreadRecord
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
object OpenGroupMigrator {
const val HTTP_PREFIX = "__loki_public_chat_group__!687474703a2f2f"
private const val HTTPS_PREFIX = "__loki_public_chat_group__!68747470733a2f2f"
const val OPEN_GET_SESSION_TRAILING_DOT_ENCODED = "6f70656e2e67657473657373696f6e2e6f72672e"
const val LEGACY_GROUP_ENCODED_ID = "__loki_public_chat_group__!687474703a2f2f3131362e3230332e37302e33332e" // old IP based toByteArray()
const val NEW_GROUP_ENCODED_ID = "__loki_public_chat_group__!68747470733a2f2f6f70656e2e67657473657373696f6e2e6f72672e" // new URL based toByteArray()
data class OpenGroupMapping(val stub: String, val legacyThreadId: Long, val newThreadId: Long?)
@VisibleForTesting
fun Recipient.roomStub(): String? {
if (!isOpenGroupRecipient) return null
val serialized = address.serialize()
if (serialized.startsWith(LEGACY_GROUP_ENCODED_ID)) {
return serialized.replace(LEGACY_GROUP_ENCODED_ID,"")
} else if (serialized.startsWith(NEW_GROUP_ENCODED_ID)) {
return serialized.replace(NEW_GROUP_ENCODED_ID,"")
} else if (serialized.startsWith(HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED)) {
return serialized.replace(HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED, "")
}
return null
}
@VisibleForTesting
fun getExistingMappings(legacy: List<ThreadRecord>, new: List<ThreadRecord>): List<OpenGroupMapping> {
val legacyStubsMapping = legacy.mapNotNull { thread ->
val stub = thread.recipient.roomStub()
stub?.let { it to thread.threadId }
}
val newStubsMapping = new.mapNotNull { thread ->
val stub = thread.recipient.roomStub()
stub?.let { it to thread.threadId }
}
return legacyStubsMapping.map { (legacyEncodedStub, legacyId) ->
// get 'new' open group thread ID if stubs match
OpenGroupMapping(
legacyEncodedStub,
legacyId,
newStubsMapping.firstOrNull { (newEncodedStub, _) -> newEncodedStub == legacyEncodedStub }?.second
)
}
}
@JvmStatic
fun migrate(databaseComponent: DatabaseComponent) {
// migrate thread db
val threadDb = databaseComponent.threadDatabase()
val legacyOpenGroups = threadDb.legacyOxenOpenGroups
val httpBasedNewGroups = threadDb.httpOxenOpenGroups
if (legacyOpenGroups.isEmpty() && httpBasedNewGroups.isEmpty()) return // no need to migrate
val newOpenGroups = threadDb.httpsOxenOpenGroups
val firstStepMigration = getExistingMappings(legacyOpenGroups, newOpenGroups)
val secondStepMigration = getExistingMappings(httpBasedNewGroups, newOpenGroups)
val groupDb = databaseComponent.groupDatabase()
val lokiApiDb = databaseComponent.lokiAPIDatabase()
val smsDb = databaseComponent.smsDatabase()
val mmsDb = databaseComponent.mmsDatabase()
val lokiMessageDatabase = databaseComponent.lokiMessageDatabase()
val lokiThreadDatabase = databaseComponent.lokiThreadDatabase()
firstStepMigration.forEach { (stub, old, new) ->
val legacyEncodedGroupId = LEGACY_GROUP_ENCODED_ID+stub
if (new == null) {
val newEncodedGroupId = NEW_GROUP_ENCODED_ID+stub
// migrate thread and group encoded values
threadDb.migrateEncodedGroup(old, newEncodedGroupId)
groupDb.migrateEncodedGroup(legacyEncodedGroupId, newEncodedGroupId)
// migrate Loki API DB values
// decode the hex to bytes, decode byte array to string i.e. "oxen" or "session"
val decodedStub = Hex.fromStringCondensed(stub).decodeToString()
val legacyLokiServerId = "${OpenGroupApi.legacyDefaultServer}.$decodedStub"
val newLokiServerId = "${OpenGroupApi.defaultServer}.$decodedStub"
lokiApiDb.migrateLegacyOpenGroup(legacyLokiServerId, newLokiServerId)
// migrate loki thread db server info
val oldServerInfo = lokiThreadDatabase.getOpenGroupChat(old)
val newServerInfo = oldServerInfo!!.copy(server = OpenGroupApi.defaultServer, id = newLokiServerId)
lokiThreadDatabase.setOpenGroupChat(newServerInfo, old)
} else {
// has a legacy and a new one
// migrate SMS and MMS tables
smsDb.migrateThreadId(old, new)
mmsDb.migrateThreadId(old, new)
lokiMessageDatabase.migrateThreadId(old, new)
// delete group for legacy ID
groupDb.delete(legacyEncodedGroupId)
// delete thread for legacy ID
threadDb.deleteConversation(old)
lokiThreadDatabase.removeOpenGroupChat(old)
}
// maybe migrate jobs here
}
secondStepMigration.forEach { (stub, old, new) ->
val legacyEncodedGroupId = HTTP_PREFIX + OPEN_GET_SESSION_TRAILING_DOT_ENCODED + stub
if (new == null) {
val newEncodedGroupId = NEW_GROUP_ENCODED_ID+stub
// migrate thread and group encoded values
threadDb.migrateEncodedGroup(old, newEncodedGroupId)
groupDb.migrateEncodedGroup(legacyEncodedGroupId, newEncodedGroupId)
// migrate Loki API DB values
// decode the hex to bytes, decode byte array to string i.e. "oxen" or "session"
val decodedStub = Hex.fromStringCondensed(stub).decodeToString()
val legacyLokiServerId = "${OpenGroupApi.httpDefaultServer}.$decodedStub"
val newLokiServerId = "${OpenGroupApi.defaultServer}.$decodedStub"
lokiApiDb.migrateLegacyOpenGroup(legacyLokiServerId, newLokiServerId)
// migrate loki thread db server info
val oldServerInfo = lokiThreadDatabase.getOpenGroupChat(old)
val newServerInfo = oldServerInfo!!.copy(server = OpenGroupApi.defaultServer, id = newLokiServerId)
lokiThreadDatabase.setOpenGroupChat(newServerInfo, old)
} else {
// has a legacy and a new one
// migrate SMS and MMS tables
smsDb.migrateThreadId(old, new)
mmsDb.migrateThreadId(old, new)
lokiMessageDatabase.migrateThreadId(old, new)
// delete group for legacy ID
groupDb.delete(legacyEncodedGroupId)
// delete thread for legacy ID
threadDb.deleteConversation(old)
lokiThreadDatabase.removeOpenGroupChat(old)
}
// maybe migrate jobs here
}
}
}

View File

@ -21,26 +21,26 @@ public class PushMediaConstraints extends MediaConstraints {
@Override
public int getImageMaxSize(Context context) {
return (int) (((double) FileServerApi.maxFileSize) / FileServerApi.fileSizeORMultiplier);
return FileServerApi.maxFileSize;
}
@Override
public int getGifMaxSize(Context context) {
return (int) (((double) FileServerApi.maxFileSize) / FileServerApi.fileSizeORMultiplier);
return FileServerApi.maxFileSize;
}
@Override
public int getVideoMaxSize(Context context) {
return (int) (((double) FileServerApi.maxFileSize) / FileServerApi.fileSizeORMultiplier);
return FileServerApi.maxFileSize;
}
@Override
public int getAudioMaxSize(Context context) {
return (int) (((double) FileServerApi.maxFileSize) / FileServerApi.fileSizeORMultiplier);
return FileServerApi.maxFileSize;
}
@Override
public int getDocumentMaxSize(Context context) {
return (int) (((double) FileServerApi.maxFileSize) / FileServerApi.fileSizeORMultiplier);
return FileServerApi.maxFileSize;
}
}

View File

@ -60,7 +60,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
// FIXME: Using a job here seems like a bad idea...
MessageReceiveParameters(envelope.toByteArray(), serverHash, null)
}
BatchMessageReceiveJob(params).executeAsync()
BatchMessageReceiveJob(params).executeAsync("background")
}
promises.add(dmsPromise)

View File

@ -2,6 +2,10 @@
<menu
xmlns:android="http://schemas.android.com/apk/res/android">
<item
android:title="@string/ConversationActivity_copy_open_group_url"
android:id="@+id/menu_copy_open_group_url" />
<item
android:title="@string/ConversationActivity_invite_to_open_group"
android:id="@+id/menu_invite_to_open_group" />

View File

@ -77,6 +77,7 @@
<string name="ConversationActivity_attachment_exceeds_size_limits">Attachment exceeds size limits for the type of message you\'re sending.</string>
<string name="ConversationActivity_unable_to_record_audio">Unable to record audio!</string>
<string name="ConversationActivity_there_is_no_app_available_to_handle_this_link_on_your_device">There is no app available to handle this link on your device.</string>
<string name="ConversationActivity_copy_open_group_url">Copy Community URL</string>
<string name="ConversationActivity_invite_to_open_group">Add members</string>
<string name="ConversationActivity_to_send_audio_messages_allow_signal_access_to_your_microphone">Session needs microphone access to send audio messages.</string>
<string name="ConversationActivity_signal_requires_the_microphone_permission_in_order_to_send_audio_messages">Session needs microphone access to send audio messages, but it has been permanently denied. Please continue to app settings, select \"Permissions\", and enable \"Microphone\".</string>

View File

@ -1,281 +0,0 @@
package org.thoughtcrime.securesms.util
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import org.mockito.kotlin.KStubbing
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import org.session.libsession.messaging.open_groups.OpenGroup
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.recipients.Recipient
import org.thoughtcrime.securesms.database.GroupDatabase
import org.thoughtcrime.securesms.database.LokiAPIDatabase
import org.thoughtcrime.securesms.database.LokiMessageDatabase
import org.thoughtcrime.securesms.database.LokiThreadDatabase
import org.thoughtcrime.securesms.database.MmsDatabase
import org.thoughtcrime.securesms.database.SmsDatabase
import org.thoughtcrime.securesms.database.ThreadDatabase
import org.thoughtcrime.securesms.database.model.ThreadRecord
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
import org.thoughtcrime.securesms.groups.OpenGroupMigrator
import org.thoughtcrime.securesms.groups.OpenGroupMigrator.OpenGroupMapping
import org.thoughtcrime.securesms.groups.OpenGroupMigrator.roomStub
class OpenGroupMigrationTests {
companion object {
const val EXAMPLE_LEGACY_ENCODED_OPEN_GROUP = "__loki_public_chat_group__!687474703a2f2f3131362e3230332e37302e33332e6f78656e"
const val EXAMPLE_NEW_ENCODED_OPEN_GROUP = "__loki_public_chat_group__!68747470733a2f2f6f70656e2e67657473657373696f6e2e6f72672e6f78656e"
const val OXEN_STUB_HEX = "6f78656e"
const val EXAMPLE_LEGACY_SERVER_ID = "http://116.203.70.33.oxen"
const val EXAMPLE_NEW_SERVER_ID = "https://open.getsession.org.oxen"
const val LEGACY_THREAD_ID = 1L
const val NEW_THREAD_ID = 2L
}
private fun legacyOpenGroupRecipient(additionalMocks: ((KStubbing<Recipient>) -> Unit) ? = null) = mock<Recipient> {
on { address } doReturn Address.fromSerialized(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP)
on { isOpenGroupRecipient } doReturn true
additionalMocks?.let { it(this) }
}
private fun newOpenGroupRecipient(additionalMocks: ((KStubbing<Recipient>) -> Unit) ? = null) = mock<Recipient> {
on { address } doReturn Address.fromSerialized(EXAMPLE_NEW_ENCODED_OPEN_GROUP)
on { isOpenGroupRecipient } doReturn true
additionalMocks?.let { it(this) }
}
private fun legacyThreadRecord(additionalRecipientMocks: ((KStubbing<Recipient>) -> Unit) ? = null, additionalThreadMocks: ((KStubbing<ThreadRecord>) -> Unit)? = null) = mock<ThreadRecord> {
val returnedRecipient = legacyOpenGroupRecipient(additionalRecipientMocks)
on { recipient } doReturn returnedRecipient
on { threadId } doReturn LEGACY_THREAD_ID
}
private fun newThreadRecord(additionalRecipientMocks: ((KStubbing<Recipient>) -> Unit)? = null, additionalThreadMocks: ((KStubbing<ThreadRecord>) -> Unit)? = null) = mock<ThreadRecord> {
val returnedRecipient = newOpenGroupRecipient(additionalRecipientMocks)
on { recipient } doReturn returnedRecipient
on { threadId } doReturn NEW_THREAD_ID
}
@Test
fun `it should generate the correct room stubs for legacy groups`() {
val mockRecipient = legacyOpenGroupRecipient()
assertEquals(OXEN_STUB_HEX, mockRecipient.roomStub())
}
@Test
fun `it should generate the correct room stubs for new groups`() {
val mockNewRecipient = newOpenGroupRecipient()
assertEquals(OXEN_STUB_HEX, mockNewRecipient.roomStub())
}
@Test
fun `it should return correct mappings`() {
val legacyThread = legacyThreadRecord()
val newThread = newThreadRecord()
val expectedMapping = listOf(
OpenGroupMapping(OXEN_STUB_HEX, LEGACY_THREAD_ID, NEW_THREAD_ID)
)
assertTrue(expectedMapping.containsAll(OpenGroupMigrator.getExistingMappings(listOf(legacyThread), listOf(newThread))))
}
@Test
fun `it should return no mappings if there are no legacy open groups`() {
val mappings = OpenGroupMigrator.getExistingMappings(listOf(), listOf())
assertTrue(mappings.isEmpty())
}
@Test
fun `it should return no mappings if there are only new open groups`() {
val newThread = newThreadRecord()
val mappings = OpenGroupMigrator.getExistingMappings(emptyList(), listOf(newThread))
assertTrue(mappings.isEmpty())
}
@Test
fun `it should return null new thread in mappings if there are only legacy open groups`() {
val legacyThread = legacyThreadRecord()
val mappings = OpenGroupMigrator.getExistingMappings(listOf(legacyThread), emptyList())
val expectedMappings = listOf(
OpenGroupMapping(OXEN_STUB_HEX, LEGACY_THREAD_ID, null)
)
assertTrue(expectedMappings.containsAll(mappings))
}
@Test
fun `test migration thread DB calls legacy and returns if no legacy official groups`() {
val mockedThreadDb = mock<ThreadDatabase> {
on { legacyOxenOpenGroups } doReturn emptyList()
}
val mockedDbComponent = mock<DatabaseComponent> {
on { threadDatabase() } doReturn mockedThreadDb
}
OpenGroupMigrator.migrate(mockedDbComponent)
verify(mockedDbComponent).threadDatabase()
verify(mockedThreadDb).legacyOxenOpenGroups
verifyNoMoreInteractions(mockedThreadDb)
}
@Test
fun `it should migrate on thread, group and loki dbs with correct values for legacy only migration`() {
// mock threadDB
val capturedThreadId = argumentCaptor<Long>()
val capturedNewEncoded = argumentCaptor<String>()
val mockedThreadDb = mock<ThreadDatabase> {
val legacyThreadRecord = legacyThreadRecord()
on { legacyOxenOpenGroups } doReturn listOf(legacyThreadRecord)
on { httpsOxenOpenGroups } doReturn emptyList()
on { migrateEncodedGroup(capturedThreadId.capture(), capturedNewEncoded.capture()) } doAnswer {}
}
// mock groupDB
val capturedGroupLegacyEncoded = argumentCaptor<String>()
val capturedGroupNewEncoded = argumentCaptor<String>()
val mockedGroupDb = mock<GroupDatabase> {
on {
migrateEncodedGroup(
capturedGroupLegacyEncoded.capture(),
capturedGroupNewEncoded.capture()
)
} doAnswer {}
}
// mock LokiAPIDB
val capturedLokiLegacyGroup = argumentCaptor<String>()
val capturedLokiNewGroup = argumentCaptor<String>()
val mockedLokiApi = mock<LokiAPIDatabase> {
on { migrateLegacyOpenGroup(capturedLokiLegacyGroup.capture(), capturedLokiNewGroup.capture()) } doAnswer {}
}
val pubKey = OpenGroupApi.defaultServerPublicKey
val room = "oxen"
val legacyServer = OpenGroupApi.legacyDefaultServer
val newServer = OpenGroupApi.defaultServer
val lokiThreadOpenGroup = argumentCaptor<OpenGroup>()
val mockedLokiThreadDb = mock<LokiThreadDatabase> {
on { getOpenGroupChat(eq(LEGACY_THREAD_ID)) } doReturn OpenGroup(legacyServer, room, "Oxen", 0, pubKey)
on { setOpenGroupChat(lokiThreadOpenGroup.capture(), eq(LEGACY_THREAD_ID)) } doAnswer {}
}
val mockedDbComponent = mock<DatabaseComponent> {
on { threadDatabase() } doReturn mockedThreadDb
on { groupDatabase() } doReturn mockedGroupDb
on { lokiAPIDatabase() } doReturn mockedLokiApi
on { lokiThreadDatabase() } doReturn mockedLokiThreadDb
}
OpenGroupMigrator.migrate(mockedDbComponent)
// expect threadDB migration to reflect new thread values:
// thread ID = 1, encoded ID = new encoded ID
assertEquals(LEGACY_THREAD_ID, capturedThreadId.firstValue)
assertEquals(EXAMPLE_NEW_ENCODED_OPEN_GROUP, capturedNewEncoded.firstValue)
// expect groupDB migration to reflect new thread values:
// legacy encoded ID, new encoded ID
assertEquals(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP, capturedGroupLegacyEncoded.firstValue)
assertEquals(EXAMPLE_NEW_ENCODED_OPEN_GROUP, capturedGroupNewEncoded.firstValue)
// expect Loki API DB migration to reflect new thread values:
assertEquals("${OpenGroupApi.legacyDefaultServer}.oxen", capturedLokiLegacyGroup.firstValue)
assertEquals("${OpenGroupApi.defaultServer}.oxen", capturedLokiNewGroup.firstValue)
assertEquals(newServer, lokiThreadOpenGroup.firstValue.server)
}
@Test
fun `it should migrate and delete legacy thread with conflicting new and old values`() {
// mock threadDB
val capturedThreadId = argumentCaptor<Long>()
val mockedThreadDb = mock<ThreadDatabase> {
val legacyThreadRecord = legacyThreadRecord()
val newThreadRecord = newThreadRecord()
on { legacyOxenOpenGroups } doReturn listOf(legacyThreadRecord)
on { httpsOxenOpenGroups } doReturn listOf(newThreadRecord)
on { deleteConversation(capturedThreadId.capture()) } doAnswer {}
}
// mock groupDB
val capturedGroupLegacyEncoded = argumentCaptor<String>()
val mockedGroupDb = mock<GroupDatabase> {
on { delete(capturedGroupLegacyEncoded.capture()) } doReturn true
}
// mock LokiAPIDB
val capturedLokiLegacyGroup = argumentCaptor<String>()
val capturedLokiNewGroup = argumentCaptor<String>()
val mockedLokiApi = mock<LokiAPIDatabase> {
on { migrateLegacyOpenGroup(capturedLokiLegacyGroup.capture(), capturedLokiNewGroup.capture()) } doAnswer {}
}
// mock messaging dbs
val migrateMmsFromThreadId = argumentCaptor<Long>()
val migrateMmsToThreadId = argumentCaptor<Long>()
val mockedMmsDb = mock<MmsDatabase> {
on { migrateThreadId(migrateMmsFromThreadId.capture(), migrateMmsToThreadId.capture()) } doAnswer {}
}
val migrateSmsFromThreadId = argumentCaptor<Long>()
val migrateSmsToThreadId = argumentCaptor<Long>()
val mockedSmsDb = mock<SmsDatabase> {
on { migrateThreadId(migrateSmsFromThreadId.capture(), migrateSmsToThreadId.capture()) } doAnswer {}
}
val lokiFromThreadId = argumentCaptor<Long>()
val lokiToThreadId = argumentCaptor<Long>()
val mockedLokiMessageDatabase = mock<LokiMessageDatabase> {
on { migrateThreadId(lokiFromThreadId.capture(), lokiToThreadId.capture()) } doAnswer {}
}
val mockedLokiThreadDb = mock<LokiThreadDatabase> {
on { removeOpenGroupChat(eq(LEGACY_THREAD_ID)) } doAnswer {}
}
val mockedDbComponent = mock<DatabaseComponent> {
on { threadDatabase() } doReturn mockedThreadDb
on { groupDatabase() } doReturn mockedGroupDb
on { lokiAPIDatabase() } doReturn mockedLokiApi
on { mmsDatabase() } doReturn mockedMmsDb
on { smsDatabase() } doReturn mockedSmsDb
on { lokiMessageDatabase() } doReturn mockedLokiMessageDatabase
on { lokiThreadDatabase() } doReturn mockedLokiThreadDb
}
OpenGroupMigrator.migrate(mockedDbComponent)
// should delete thread by thread ID
assertEquals(LEGACY_THREAD_ID, capturedThreadId.firstValue)
// should delete group by legacy encoded ID
assertEquals(EXAMPLE_LEGACY_ENCODED_OPEN_GROUP, capturedGroupLegacyEncoded.firstValue)
// should migrate SMS from legacy thread ID to new thread ID
assertEquals(LEGACY_THREAD_ID, migrateSmsFromThreadId.firstValue)
assertEquals(NEW_THREAD_ID, migrateSmsToThreadId.firstValue)
// should migrate MMS from legacy thread ID to new thread ID
assertEquals(LEGACY_THREAD_ID, migrateMmsFromThreadId.firstValue)
assertEquals(NEW_THREAD_ID, migrateMmsToThreadId.firstValue)
}
}

View File

@ -16,15 +16,6 @@ object FileServerApi {
private const val serverPublicKey = "da21e1d886c6fbaea313f75298bd64aab03a97ce985b46bb2dad9f2089c8ee59"
const val server = "http://filev2.getsession.org"
const val maxFileSize = 10_000_000 // 10 MB
/**
* The file server has a file size limit of `maxFileSize`, which the Service Nodes try to enforce as well. However, the limit applied by the Service Nodes
* is on the **HTTP request** and not the actual file size. Because the file server expects the file data to be base 64 encoded, the size of the HTTP
* request for a given file will be at least `ceil(n / 3) * 4` bytes, where n is the file size in bytes. This is the minimum size because there might also
* be other parameters in the request. On average the multiplier appears to be about 1.5, so when checking whether the file will exceed the file size limit when
* uploading a file we just divide the size of the file by this number. The alternative would be to actually check the size of the HTTP request but that's only
* possible after proof of work has been calculated and the onion request encryption has happened, which takes several seconds.
*/
const val fileSizeORMultiplier = 2 // TODO: It should be possible to set this to 1.5?
sealed class Error(message: String) : Exception(message) {
object ParsingFailed : Error("Invalid response.")

View File

@ -42,7 +42,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id"
}
override fun execute() {
override fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val threadID = storage.getThreadIdForMms(databaseMessageID)
@ -59,7 +59,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.d("AttachmentDownloadJob", "Setting attachment state = failed, don't have attachment")
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
}
this.handlePermanentFailure(exception)
this.handlePermanentFailure(dispatcherName, exception)
} else if (exception == Error.DuplicateData) {
attachment?.let { id ->
Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data")
@ -68,7 +68,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.d("AttachmentDownloadJob", "Setting attachment state = done from duplicate data")
messageDataProvider.setAttachmentState(AttachmentState.DONE, AttachmentId(attachmentID,0), databaseMessageID)
}
this.handleSuccess()
this.handleSuccess(dispatcherName)
} else {
if (failureCount + 1 >= maxFailureCount) {
attachment?.let { id ->
@ -79,7 +79,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
}
}
this.handleFailure(exception)
this.handleFailure(dispatcherName, exception)
}
}
@ -150,7 +150,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.d("AttachmentDownloadJob", "deleting tempfile")
tempFile.delete()
Log.d("AttachmentDownloadJob", "succeeding job")
handleSuccess()
handleSuccess(dispatcherName)
} catch (e: Exception) {
Log.e("AttachmentDownloadJob", "Error processing attachment download", e)
tempFile?.delete()
@ -169,17 +169,17 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
}
private fun handleSuccess() {
private fun handleSuccess(dispatcherName: String) {
Log.w("AttachmentDownloadJob", "Attachment downloaded successfully.")
delegate?.handleJobSucceeded(this)
delegate?.handleJobSucceeded(this, dispatcherName)
}
private fun handlePermanentFailure(e: Exception) {
delegate?.handleJobFailedPermanently(this, e)
private fun handlePermanentFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailedPermanently(this, dispatcherName, e)
}
private fun handleFailure(e: Exception) {
delegate?.handleJobFailed(this, e)
private fun handleFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailed(this, dispatcherName, e)
}
private fun createTempFile(): File {

View File

@ -45,29 +45,29 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id"
}
override fun execute() {
override fun execute(dispatcherName: String) {
try {
val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val attachment = messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
?: return handleFailure(Error.NoAttachment)
?: return handleFailure(dispatcherName, Error.NoAttachment)
val openGroup = storage.getOpenGroup(threadID.toLong())
if (openGroup != null) {
val keyAndResult = upload(attachment, openGroup.server, false) {
OpenGroupApi.upload(it, openGroup.room, openGroup.server)
}
handleSuccess(attachment, keyAndResult.first, keyAndResult.second)
handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second)
} else {
val keyAndResult = upload(attachment, FileServerApi.server, true) {
FileServerApi.upload(it)
}
handleSuccess(attachment, keyAndResult.first, keyAndResult.second)
handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second)
}
} catch (e: java.lang.Exception) {
if (e == Error.NoAttachment) {
this.handlePermanentFailure(e)
this.handlePermanentFailure(dispatcherName, e)
} else {
this.handleFailure(e)
this.handleFailure(dispatcherName, e)
}
}
}
@ -104,9 +104,9 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
return Pair(key, UploadResult(id, "${server}/file/$id", digest))
}
private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult) {
private fun handleSuccess(dispatcherName: String, attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: UploadResult) {
Log.d(TAG, "Attachment uploaded successfully.")
delegate?.handleJobSucceeded(this)
delegate?.handleJobSucceeded(this, dispatcherName)
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
messageDataProvider.handleSuccessfulAttachmentUpload(attachmentID, attachment, attachmentKey, uploadResult)
if (attachment.contentType.startsWith("audio/")) {
@ -144,16 +144,16 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
storage.resumeMessageSendJobIfNeeded(messageSendJobID)
}
private fun handlePermanentFailure(e: Exception) {
private fun handlePermanentFailure(dispatcherName: String, e: Exception) {
Log.w(TAG, "Attachment upload failed permanently due to error: $this.")
delegate?.handleJobFailedPermanently(this, e)
delegate?.handleJobFailedPermanently(this, dispatcherName, e)
MessagingModuleConfiguration.shared.messageDataProvider.handleFailedAttachmentUpload(attachmentID)
failAssociatedMessageSendJob(e)
}
private fun handleFailure(e: Exception) {
private fun handleFailure(dispatcherName: String, e: Exception) {
Log.w(TAG, "Attachment upload failed due to error: $this.")
delegate?.handleJobFailed(this, e)
delegate?.handleJobFailed(this, dispatcherName, e)
if (failureCount + 1 >= maxFailureCount) {
failAssociatedMessageSendJob(e)
}

View File

@ -29,14 +29,14 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
return "$server.$room"
}
override fun execute() {
override fun execute(dispatcherName: String) {
try {
val openGroup = OpenGroupUrlParser.parseUrl(joinUrl)
val storage = MessagingModuleConfiguration.shared.storage
val allOpenGroups = storage.getAllOpenGroups().map { it.value.joinURL }
if (allOpenGroups.contains(openGroup.joinUrl())) {
Log.e("OpenGroupDispatcher", "Failed to add group because", DuplicateGroupException())
delegate?.handleJobFailed(this, DuplicateGroupException())
delegate?.handleJobFailed(this, dispatcherName, DuplicateGroupException())
return
}
// get image
@ -50,11 +50,11 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
storage.onOpenGroupAdded(openGroup.server)
} catch (e: Exception) {
Log.e("OpenGroupDispatcher", "Failed to add group because",e)
delegate?.handleJobFailed(this, e)
delegate?.handleJobFailed(this, dispatcherName, e)
return
}
Log.d("Loki", "Group added successfully")
delegate?.handleJobSucceeded(this)
delegate?.handleJobSucceeded(this, dispatcherName)
}
override fun serialize(): Data = Data.Builder()

View File

@ -66,11 +66,11 @@ class BatchMessageReceiveJob(
return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID)
}
override fun execute() {
executeAsync().get()
override fun execute(dispatcherName: String) {
executeAsync(dispatcherName).get()
}
fun executeAsync(): Promise<Unit, Exception> {
fun executeAsync(dispatcherName: String): Promise<Unit, Exception> {
return task {
val threadMap = mutableMapOf<Long, MutableList<ParsedMessage>>()
val storage = MessagingModuleConfiguration.shared.storage
@ -188,19 +188,21 @@ class BatchMessageReceiveJob(
deferredThreadMap.awaitAll()
}
if (failures.isEmpty()) {
handleSuccess()
handleSuccess(dispatcherName)
} else {
handleFailure()
handleFailure(dispatcherName)
}
}
}
private fun handleSuccess() {
this.delegate?.handleJobSucceeded(this)
private fun handleSuccess(dispatcherName: String) {
Log.i(TAG, "Completed processing of ${messages.size} messages")
this.delegate?.handleJobSucceeded(this, dispatcherName)
}
private fun handleFailure() {
this.delegate?.handleJobFailed(this, Exception("One or more jobs resulted in failure"))
private fun handleFailure(dispatcherName: String) {
Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully)")
this.delegate?.handleJobFailed(this, dispatcherName, Exception("One or more jobs resulted in failure"))
}
override fun serialize(): Data {

View File

@ -12,7 +12,7 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job {
override var failureCount: Int = 0
override val maxFailureCount: Int = 10
override fun execute() {
override fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage
val imageId = storage.getOpenGroup(room, server)?.imageId ?: return
try {
@ -20,9 +20,9 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job {
val groupId = GroupUtil.getEncodedOpenGroupID("$server.$room".toByteArray())
storage.updateProfilePicture(groupId, bytes)
storage.updateTimestampUpdated(groupId, System.currentTimeMillis())
delegate?.handleJobSucceeded(this)
delegate?.handleJobSucceeded(this, dispatcherName)
} catch (e: Exception) {
delegate?.handleJobFailed(this, e)
delegate?.handleJobFailed(this, dispatcherName, e)
}
}

View File

@ -17,7 +17,7 @@ interface Job {
internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes
}
fun execute()
fun execute(dispatcherName: String)
fun serialize(): Data

View File

@ -2,7 +2,7 @@ package org.session.libsession.messaging.jobs
interface JobDelegate {
fun handleJobSucceeded(job: Job)
fun handleJobFailed(job: Job, error: Exception)
fun handleJobFailedPermanently(job: Job, error: Exception)
fun handleJobSucceeded(job: Job, dispatcherName: String)
fun handleJobFailed(job: Job, dispatcherName: String, error: Exception)
fun handleJobFailedPermanently(job: Job, dispatcherName: String, error: Exception)
}

View File

@ -53,7 +53,7 @@ class JobQueue : JobDelegate {
}
if (openGroupId.isNullOrEmpty()) {
Log.e("OpenGroupDispatcher", "Open Group ID was null on ${job.javaClass.simpleName}")
handleJobFailedPermanently(job, NullPointerException("Open Group ID was null"))
handleJobFailedPermanently(job, name, NullPointerException("Open Group ID was null"))
} else {
val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) {
Log.d("OpenGroupDispatcher", "Creating ${openGroupId.hashCode()} channel")
@ -95,9 +95,16 @@ class JobQueue : JobDelegate {
}
private fun Job.process(dispatcherName: String) {
Log.d(dispatcherName,"processJob: ${javaClass.simpleName}")
Log.d(dispatcherName,"processJob: ${javaClass.simpleName} (id: $id)")
delegate = this@JobQueue
execute()
try {
execute(dispatcherName)
}
catch (e: Exception) {
Log.d(dispatcherName, "unhandledJobException: ${javaClass.simpleName} (id: $id)")
this@JobQueue.handleJobFailed(this, dispatcherName, e)
}
}
init {
@ -177,7 +184,7 @@ class JobQueue : JobDelegate {
return
}
if (!pendingJobIds.add(id)) {
Log.e("Loki","tried to re-queue pending/in-progress job")
Log.e("Loki","tried to re-queue pending/in-progress job (id: $id)")
return
}
queue.trySend(job)
@ -196,7 +203,7 @@ class JobQueue : JobDelegate {
}
}
pendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.")
Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName} (id: ${job.id}).")
queue.trySend(job) // Offer always called on unlimited capacity
}
}
@ -223,21 +230,21 @@ class JobQueue : JobDelegate {
}
}
override fun handleJobSucceeded(job: Job) {
override fun handleJobSucceeded(job: Job, dispatcherName: String) {
val jobId = job.id ?: return
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
pendingJobIds.remove(jobId)
}
override fun handleJobFailed(job: Job, error: Exception) {
override fun handleJobFailed(job: Job, dispatcherName: String, error: Exception) {
// Canceled
val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) {
return Log.i("Loki", "${job::class.simpleName} canceled.")
return Log.i("Loki", "${job::class.simpleName} canceled (id: ${job.id}).")
}
// Message send jobs waiting for the attachment to upload
if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) {
Log.i("Loki", "Message send job waiting for attachment upload to finish.")
Log.i("Loki", "Message send job waiting for attachment upload to finish (id: ${job.id}).")
return
}
@ -255,21 +262,22 @@ class JobQueue : JobDelegate {
job.failureCount += 1
if (job.failureCount >= job.maxFailureCount) {
handleJobFailedPermanently(job, error)
handleJobFailedPermanently(job, dispatcherName, error)
} else {
storage.persistJob(job)
val retryInterval = getRetryInterval(job)
Log.i("Loki", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
Log.i("Loki", "${job::class.simpleName} failed (id: ${job.id}); scheduling retry (failure count is ${job.failureCount}).")
timer.schedule(delay = retryInterval) {
Log.i("Loki", "Retrying ${job::class.simpleName}.")
Log.i("Loki", "Retrying ${job::class.simpleName} (id: ${job.id}).")
queue.trySend(job)
}
}
}
override fun handleJobFailedPermanently(job: Job, error: Exception) {
override fun handleJobFailedPermanently(job: Job, dispatcherName: String, error: Exception) {
val jobId = job.id ?: return
handleJobFailedPermanently(jobId)
Log.d(dispatcherName, "permanentlyFailedJob: ${javaClass.simpleName} (id: ${job.id})")
}
private fun handleJobFailedPermanently(jobId: String) {

View File

@ -25,11 +25,11 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
override fun execute() {
executeAsync().get()
override fun execute(dispatcherName: String) {
executeAsync(dispatcherName).get()
}
fun executeAsync(): Promise<Unit, Exception> {
fun executeAsync(dispatcherName: String): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
try {
val isRetry: Boolean = failureCount != 0
@ -39,32 +39,32 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey)
message.serverHash = serverHash
MessageReceiver.handle(message, proto, this.openGroupID)
this.handleSuccess()
this.handleSuccess(dispatcherName)
deferred.resolve(Unit)
} catch (e: Exception) {
Log.e(TAG, "Couldn't receive message.", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e("Loki", "Message receive job permanently failed.", e)
this.handlePermanentFailure(e)
this.handlePermanentFailure(dispatcherName, e)
} else {
Log.e("Loki", "Couldn't receive message.", e)
this.handleFailure(e)
this.handleFailure(dispatcherName, e)
}
deferred.resolve(Unit) // The promise is just used to keep track of when we're done
}
return deferred.promise
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
private fun handleSuccess(dispatcherName: String) {
delegate?.handleJobSucceeded(this, dispatcherName)
}
private fun handlePermanentFailure(e: Exception) {
delegate?.handleJobFailedPermanently(this, e)
private fun handlePermanentFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailedPermanently(this, dispatcherName, e)
}
private fun handleFailure(e: Exception) {
delegate?.handleJobFailed(this, e)
private fun handleFailure(dispatcherName: String, e: Exception) {
delegate?.handleJobFailed(this, dispatcherName, e)
}
override fun serialize(): Data {

View File

@ -33,7 +33,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
private val DESTINATION_KEY = "destination"
}
override fun execute() {
override fun execute(dispatcherName: String) {
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val message = message as? VisibleMessage
val storage = MessagingModuleConfiguration.shared.storage
@ -61,12 +61,12 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
}
}
if (attachmentsToUpload.isNotEmpty()) {
this.handleFailure(AwaitingAttachmentUploadException)
this.handleFailure(dispatcherName, AwaitingAttachmentUploadException)
return
} // Wait for all attachments to upload before continuing
}
val promise = MessageSender.send(this.message, this.destination).success {
this.handleSuccess()
this.handleSuccess(dispatcherName)
}.fail { exception ->
var logStacktrace = true
@ -75,14 +75,14 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
is HTTP.HTTPRequestFailedException -> {
logStacktrace = false
if (exception.statusCode == 429) { this.handlePermanentFailure(exception) }
else { this.handleFailure(exception) }
if (exception.statusCode == 429) { this.handlePermanentFailure(dispatcherName, exception) }
else { this.handleFailure(dispatcherName, exception) }
}
is MessageSender.Error -> {
if (!exception.isRetryable) { this.handlePermanentFailure(exception) }
else { this.handleFailure(exception) }
if (!exception.isRetryable) { this.handlePermanentFailure(dispatcherName, exception) }
else { this.handleFailure(dispatcherName, exception) }
}
else -> this.handleFailure(exception)
else -> this.handleFailure(dispatcherName, exception)
}
if (logStacktrace) { Log.e(TAG, "Couldn't send message due to error", exception) }
@ -95,15 +95,15 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
}
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
private fun handleSuccess(dispatcherName: String) {
delegate?.handleJobSucceeded(this, dispatcherName)
}
private fun handlePermanentFailure(error: Exception) {
delegate?.handleJobFailedPermanently(this, error)
private fun handlePermanentFailure(dispatcherName: String, error: Exception) {
delegate?.handleJobFailedPermanently(this, dispatcherName, error)
}
private fun handleFailure(error: Exception) {
private fun handleFailure(dispatcherName: String, error: Exception) {
Log.w(TAG, "Failed to send $message::class.simpleName.")
val message = message as? VisibleMessage
if (message != null) {
@ -111,7 +111,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
return // The message has been deleted
}
}
delegate?.handleJobFailed(this, error)
delegate?.handleJobFailed(this, dispatcherName, error)
}
override fun serialize(): Data {

View File

@ -32,7 +32,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
private val MESSAGE_KEY = "message"
}
override fun execute() {
override fun execute(dispatcherName: String) {
val server = PushNotificationAPI.server
val parameters = mapOf( "data" to message.data, "send_to" to message.recipient )
val url = "${server}/notify"
@ -48,18 +48,18 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
Log.d("Loki", "Couldn't notify PN server due to error: $exception.")
}
}.success {
handleSuccess()
handleSuccess(dispatcherName)
}. fail {
handleFailure(it)
handleFailure(dispatcherName, it)
}
}
private fun handleSuccess() {
delegate?.handleJobSucceeded(this)
private fun handleSuccess(dispatcherName: String) {
delegate?.handleJobSucceeded(this, dispatcherName)
}
private fun handleFailure(error: Exception) {
delegate?.handleJobFailed(this, error)
private fun handleFailure(dispatcherName: String, error: Exception) {
delegate?.handleJobFailed(this, dispatcherName, error)
}
override fun serialize(): Data {

View File

@ -19,7 +19,7 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override fun execute() {
override fun execute(dispatcherName: String) {
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val numberToDelete = messageServerIds.size
Log.d(TAG, "Deleting $numberToDelete messages")
@ -39,10 +39,10 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
}
Log.d(TAG, "Deleted ${messageIds.first.size + messageIds.second.size} messages successfully")
delegate?.handleJobSucceeded(this)
delegate?.handleJobSucceeded(this, dispatcherName)
}
catch (e: Exception) {
delegate?.handleJobFailed(this, e)
delegate?.handleJobFailed(this, dispatcherName, e)
}
}

View File

@ -20,7 +20,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
const val THREAD_LENGTH_TRIGGER_SIZE = 2000
}
override fun execute() {
override fun execute(dispatcherName: String) {
val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val storage = MessagingModuleConfiguration.shared.storage
@ -29,7 +29,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
val oldestMessageTime = System.currentTimeMillis() - TRIM_TIME_LIMIT
storage.trimThreadBefore(threadId, oldestMessageTime)
}
delegate?.handleJobSucceeded(this)
delegate?.handleJobSucceeded(this, dispatcherName)
}
override fun serialize(): Data {