From efa36d4ceab0fdb1de927e714cc5e17269f90ca4 Mon Sep 17 00:00:00 2001 From: jubb Date: Wed, 26 May 2021 15:22:19 +1000 Subject: [PATCH] refactor: trim thread is now queued after batch processing messages, for other conversations still after every persisted message migrate TrimThreadJob.kt to new job system deleting more open group references in removal open group last message / last deletion now sets after processing vs after fetching --- .../securesms/database/MmsDatabase.java | 3 -- .../securesms/database/SmsDatabase.java | 3 -- .../securesms/database/Storage.kt | 11 +++++ .../securesms/database/ThreadDatabase.java | 1 + .../securesms/loki/activities/HomeActivity.kt | 3 -- .../securesms/loki/api/OpenGroupManager.kt | 8 ++-- .../loki/database/LokiMessageDatabase.kt | 28 +++++++++++ .../loki/database/LokiThreadDatabase.kt | 8 ++++ .../libsession/database/StorageProtocol.kt | 1 + .../libsession/messaging/jobs/JobQueue.kt | 8 ++-- .../jobs/SessionJobManagerFactories.kt | 3 +- .../messaging/jobs/TrimThreadJob.kt | 47 +++++++++++++++++++ .../messaging/open_groups/OpenGroupAPIV2.kt | 12 ----- .../pollers/OpenGroupPollerV2.kt | 28 +++++++++-- 14 files changed, 130 insertions(+), 34 deletions(-) create mode 100644 libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java index 4ee953b0b..75dca9601 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java @@ -46,7 +46,6 @@ import org.thoughtcrime.securesms.database.model.MediaMmsMessageRecord; import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.NotificationMmsMessageRecord; import org.thoughtcrime.securesms.database.model.Quote; -import org.thoughtcrime.securesms.jobs.TrimThreadJob; import org.session.libsession.messaging.messages.signal.IncomingMediaMessage; import org.thoughtcrime.securesms.mms.MmsException; import org.session.libsession.messaging.messages.signal.OutgoingExpirationUpdateMessage; @@ -668,7 +667,6 @@ public class MmsDatabase extends MessagingDatabase { } notifyConversationListeners(threadId); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return Optional.of(new InsertResult(messageId, threadId)); } @@ -812,7 +810,6 @@ public class MmsDatabase extends MessagingDatabase { DatabaseFactory.getThreadDatabase(context).setLastSeen(threadId); DatabaseFactory.getThreadDatabase(context).setHasSent(threadId, true); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return messageId; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java index 613bff741..dfbc39f15 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java @@ -35,7 +35,6 @@ import org.session.libsession.utilities.IdentityKeyMismatchList; import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper; import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.SmsMessageRecord; -import org.thoughtcrime.securesms.jobs.TrimThreadJob; import org.session.libsession.messaging.messages.signal.IncomingGroupMessage; import org.session.libsession.messaging.messages.signal.IncomingTextMessage; import org.session.libsession.messaging.messages.signal.OutgoingTextMessage; @@ -414,7 +413,6 @@ public class SmsDatabase extends MessagingDatabase { notifyConversationListeners(threadId); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return Optional.of(new InsertResult(messageId, threadId)); } @@ -484,7 +482,6 @@ public class SmsDatabase extends MessagingDatabase { notifyConversationListeners(threadId); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return messageId; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 5f498fa10..47e6ab99c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -1,5 +1,6 @@ package org.thoughtcrime.securesms.database +import android.app.job.JobScheduler import android.content.Context import android.net.Uri import org.session.libsession.database.StorageProtocol @@ -161,6 +162,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, messageID = result.messageId } } + val threadID = message.threadID + if (openGroupID.isNullOrEmpty() && threadID != null && threadID >= 0) { + JobQueue.shared.add(TrimThreadJob(threadID)) + // open group trim thread job is scheduled after processing + } return messageID } @@ -539,6 +545,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, return threadDB.getLastUpdated(threadID) } + override fun trimThread(threadID: Long, threadLimit: Int) { + val threadDB = DatabaseFactory.getThreadDatabase(context) + threadDB.trimThread(threadID, threadLimit) + } + override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri { return PartAuthority.getAttachmentDataUri(attachmentId) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java index ebb0dbab2..2ba85ca66 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java @@ -423,6 +423,7 @@ public class ThreadDatabase extends Database { DatabaseFactory.getSmsDatabase(context).deleteThread(threadId); DatabaseFactory.getMmsDatabase(context).deleteThread(threadId); DatabaseFactory.getDraftDatabase(context).clearDrafts(threadId); + DatabaseFactory.getLokiMessageDatabase(context).deleteThread(threadId); deleteThread(threadId); notifyConversationListeners(threadId); notifyConversationListListeners(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt index 0e5ccd6fd..9a9a158cf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt @@ -330,9 +330,6 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), // Delete the conversation val v2OpenGroup = DatabaseFactory.getLokiThreadDatabase(context).getOpenGroupChat(threadID) if (v2OpenGroup != null) { - val apiDB = DatabaseFactory.getLokiAPIDatabase(context) - apiDB.removeLastMessageServerID(v2OpenGroup.room, v2OpenGroup.server) - apiDB.removeLastDeletionServerID(v2OpenGroup.room, v2OpenGroup.server) OpenGroupManager.delete(v2OpenGroup.server, v2OpenGroup.room, this@HomeActivity) } else { ThreadUtils.queue { diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt index b20de78bd..a1b2b208f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt @@ -89,7 +89,7 @@ object OpenGroupManager { val openGroup = OpenGroupV2(server, room, info.name, publicKey) threadDB.setOpenGroupChat(openGroup, threadID) // Start the poller if needed - if (pollers[server] == null) { + pollers[server]?.startIfNeeded() ?: run { val poller = OpenGroupPollerV2(server, executorService) Util.runOnMain { poller.startIfNeeded() } pollers[server] = poller @@ -111,9 +111,11 @@ object OpenGroupManager { pollers.remove(server) } // Delete + storage.removeLastDeletionServerID(room, server) + storage.removeLastMessageServerID(room, server) + val lokiThreadDB = DatabaseFactory.getLokiThreadDatabase(context) + lokiThreadDB.removeOpenGroupChat(threadID) ThreadUtils.queue { - storage.removeLastDeletionServerID(room, server) - storage.removeLastMessageServerID(room, server) GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt index 83e64f884..137579f07 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt @@ -8,6 +8,7 @@ import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.loki.utilities.* import org.session.libsignal.database.LokiMessageDatabaseProtocol +import org.session.libsignal.utilities.Log class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), LokiMessageDatabaseProtocol { @@ -131,4 +132,31 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab contentValues.put(Companion.errorMessage, errorMessage) database.insertOrUpdate(errorMessageTable, contentValues, "${Companion.messageID} = ?", arrayOf(messageID.toString())) } + + fun deleteThread(threadId: Long) { + val database = databaseHelper.writableDatabase + try { + val messages = mutableSetOf>() + database.get(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString())) { cursor -> + // for each add + while (cursor.moveToNext()) { + messages.add(cursor.getLong(Companion.messageID) to cursor.getLong(Companion.serverID)) + } + } + Log.d("Test", "Need to delete ${messages.size} number of messages") + + var deletedCount = 0L + + database.beginTransaction() + messages.forEach { (messageId, serverId) -> + deletedCount += database.delete(messageIDTable, "${Companion.messageID} = ? AND ${Companion.serverID} = ?", arrayOf(messageId.toString(), serverId.toString())) + } + Log.d("Test", "Deleted $deletedCount from messageIDTable") + val mappingDeleted = database.delete(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString())) + Log.d("Test", "Deleted $mappingDeleted from mapping table") + database.setTransactionSuccessful() + } finally { + database.endTransaction() + } + } } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt index b0e1cfe99..f0f6123b2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt @@ -73,4 +73,12 @@ class LokiThreadDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa contentValues.put(publicChat, JsonUtil.toJson(openGroupV2.toJson())) database.insertOrUpdate(publicChatTable, contentValues, "${Companion.threadID} = ?", arrayOf(threadID.toString())) } + + fun removeOpenGroupChat(threadID: Long) { + if (threadID < 0) return + + val database = databaseHelper.writableDatabase + database.delete(publicChatTable,"${Companion.threadID} = ?", arrayOf(threadID.toString())) + } + } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 00b856d80..ea5931f3b 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -130,6 +130,7 @@ interface StorageProtocol { fun getThreadId(recipient: Recipient): Long? fun getThreadIdForMms(mmsId: Long): Long fun getLastUpdated(threadID: Long): Long + fun trimThread(threadID: Long, threadLimit: Int) // Contacts fun getContactWithSessionID(sessionID: String): Contact? diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 147a9f107..e6803e89d 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -37,9 +37,9 @@ class JobQueue : JobDelegate { init { // Process jobs scope.launch { - val rxQueue = Channel(capacity = 1024) - val txQueue = Channel(capacity = 1024) - val attachmentQueue = Channel(capacity = 1024) + val rxQueue = Channel(capacity = 4096) + val txQueue = Channel(capacity = 4096) + val attachmentQueue = Channel(capacity = 4096) val receiveJob = processWithDispatcher(rxQueue, rxDispatcher) val txJob = processWithDispatcher(txQueue, txDispatcher) @@ -50,7 +50,7 @@ class JobQueue : JobDelegate { when (job) { is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job) is AttachmentDownloadJob -> attachmentQueue.send(job) - is MessageReceiveJob -> rxQueue.send(job) + is MessageReceiveJob, is TrimThreadJob -> rxQueue.send(job) else -> throw IllegalStateException("Unexpected job type.") } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt index c681a67f3..15526981c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt @@ -10,7 +10,8 @@ class SessionJobManagerFactories { AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(), MessageReceiveJob.KEY to MessageReceiveJob.Factory(), MessageSendJob.KEY to MessageSendJob.Factory(), - NotifyPNServerJob.KEY to NotifyPNServerJob.Factory() + NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(), + TrimThreadJob.KEY to TrimThreadJob.Factory() ) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt new file mode 100644 index 000000000..1122e7329 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt @@ -0,0 +1,47 @@ +package org.session.libsession.messaging.jobs + +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.utilities.Data +import org.session.libsession.utilities.TextSecurePreferences + +class TrimThreadJob(val threadId: Long) : Job { + companion object { + const val KEY: String = "TrimThreadJob" + + const val THREAD_ID = "thread_id" + } + + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + override val maxFailureCount: Int = 1 + + override fun execute() { + val context = MessagingModuleConfiguration.shared.context + val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context) + val threadLengthLimit = TextSecurePreferences.getThreadTrimLength(context) + + if (trimmingEnabled) { + MessagingModuleConfiguration.shared.storage.trimThread(threadId, threadLengthLimit) + } + + delegate?.handleJobSucceeded(this) + } + + override fun serialize(): Data { + return Data.Builder() + .putLong(THREAD_ID, threadId) + .build() + } + + override fun getFactoryKey(): String = "TrimThreadJob" + + class Factory : Job.Factory { + + override fun create(data: Data): TrimThreadJob { + return TrimThreadJob(data.getLong(THREAD_ID)) + } + } + +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt index d6f9aa6c6..a1bcfe441 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt @@ -258,9 +258,6 @@ object OpenGroupAPIV2 { } private fun parseMessages(room: String, server: String, rawMessages: List>): List { - val storage = MessagingModuleConfiguration.shared.storage - val lastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0 - var currentLastMessageServerID = lastMessageServerID val messages = rawMessages.mapNotNull { json -> json as Map try { @@ -275,15 +272,11 @@ object OpenGroupAPIV2 { Log.d("Loki", "Ignoring message with invalid signature.") return@mapNotNull null } - if (message.serverID > lastMessageServerID) { - currentLastMessageServerID = message.serverID - } message } catch (e: Exception) { null } } - storage.setLastMessageServerID(room, server, currentLastMessageServerID) return messages } // endregion @@ -404,11 +397,6 @@ object OpenGroupAPIV2 { val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java) val idsAsString = JsonUtil.toJson(json["deletions"]) val deletedServerIDs = JsonUtil.fromJson>(idsAsString, type) ?: throw Error.ParsingFailed - val lastDeletionServerID = storage.getLastDeletionServerID(roomID, server) ?: 0 - val serverID = deletedServerIDs.maxByOrNull { it.id } ?: MessageDeletion.empty - if (serverID.id > lastDeletionServerID) { - storage.setLastDeletionServerID(roomID, server, serverID.id) - } // Messages val rawMessages = json["messages"] as? List> ?: return@mapNotNull null val messages = parseMessages(roomID, server, rawMessages) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt index 4b9c9a707..57ec540da 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -5,6 +5,7 @@ import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.jobs.TrimThreadJob import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 import org.session.libsession.utilities.Address @@ -15,6 +16,7 @@ import org.session.libsignal.utilities.successBackground import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +import kotlin.math.max class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) { var hasStarted = false @@ -44,8 +46,8 @@ class OpenGroupPollerV2(private val server: String, private val executorService: return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses -> responses.forEach { (room, response) -> val openGroupID = "$server.$room" - handleNewMessages(openGroupID, response.messages, isBackgroundPoll) - handleDeletedMessages(openGroupID, response.deletions) + handleNewMessages(room, openGroupID, response.messages, isBackgroundPoll) + handleDeletedMessages(room, openGroupID, response.deletions) if (secondToLastJob == null && !isCaughtUp) { isCaughtUp = true } @@ -55,8 +57,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService: }.map { } } - private fun handleNewMessages(openGroupID: String, messages: List, isBackgroundPoll: Boolean) { - if (!hasStarted) { return } + private fun handleNewMessages(room: String, openGroupID: String, messages: List, isBackgroundPoll: Boolean) { + val storage = MessagingModuleConfiguration.shared.storage + val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) + // check thread still exists + val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1 + val threadExists = threadId >= 0 + if (!hasStarted || !threadExists) { return } var latestJob: MessageReceiveJob? = null messages.sortedBy { it.serverID!! }.forEach { message -> try { @@ -82,9 +89,15 @@ class OpenGroupPollerV2(private val server: String, private val executorService: Log.e("Loki", "Exception parsing message", e) } } + val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0 + val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID) + if (actualMax > 0) { + storage.setLastMessageServerID(room, server, actualMax) + } + JobQueue.shared.add(TrimThreadJob(threadId)) } - private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List) { + private fun handleDeletedMessages(room: String, openGroupID: String, deletedMessageServerIDs: List) { val storage = MessagingModuleConfiguration.shared.storage val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) @@ -99,5 +112,10 @@ class OpenGroupPollerV2(private val server: String, private val executorService: deletedMessageIDs.forEach { (messageId, isSms) -> MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) } + val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L + val latestMax = deletedMessageServerIDs.maxOrNull() ?: 0L + if (latestMax > currentMax && latestMax != 0L) { + storage.setLastDeletionServerID(room, server, latestMax) + } } } \ No newline at end of file