diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java index 4ee953b0b..75dca9601 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.java @@ -46,7 +46,6 @@ import org.thoughtcrime.securesms.database.model.MediaMmsMessageRecord; import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.NotificationMmsMessageRecord; import org.thoughtcrime.securesms.database.model.Quote; -import org.thoughtcrime.securesms.jobs.TrimThreadJob; import org.session.libsession.messaging.messages.signal.IncomingMediaMessage; import org.thoughtcrime.securesms.mms.MmsException; import org.session.libsession.messaging.messages.signal.OutgoingExpirationUpdateMessage; @@ -668,7 +667,6 @@ public class MmsDatabase extends MessagingDatabase { } notifyConversationListeners(threadId); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return Optional.of(new InsertResult(messageId, threadId)); } @@ -812,7 +810,6 @@ public class MmsDatabase extends MessagingDatabase { DatabaseFactory.getThreadDatabase(context).setLastSeen(threadId); DatabaseFactory.getThreadDatabase(context).setHasSent(threadId, true); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return messageId; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java index 613bff741..dfbc39f15 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java @@ -35,7 +35,6 @@ import org.session.libsession.utilities.IdentityKeyMismatchList; import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper; import org.thoughtcrime.securesms.database.model.MessageRecord; import org.thoughtcrime.securesms.database.model.SmsMessageRecord; -import org.thoughtcrime.securesms.jobs.TrimThreadJob; import org.session.libsession.messaging.messages.signal.IncomingGroupMessage; import org.session.libsession.messaging.messages.signal.IncomingTextMessage; import org.session.libsession.messaging.messages.signal.OutgoingTextMessage; @@ -414,7 +413,6 @@ public class SmsDatabase extends MessagingDatabase { notifyConversationListeners(threadId); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return Optional.of(new InsertResult(messageId, threadId)); } @@ -484,7 +482,6 @@ public class SmsDatabase extends MessagingDatabase { notifyConversationListeners(threadId); - ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId)); return messageId; } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 5f498fa10..47e6ab99c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -1,5 +1,6 @@ package org.thoughtcrime.securesms.database +import android.app.job.JobScheduler import android.content.Context import android.net.Uri import org.session.libsession.database.StorageProtocol @@ -161,6 +162,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, messageID = result.messageId } } + val threadID = message.threadID + if (openGroupID.isNullOrEmpty() && threadID != null && threadID >= 0) { + JobQueue.shared.add(TrimThreadJob(threadID)) + // open group trim thread job is scheduled after processing + } return messageID } @@ -539,6 +545,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, return threadDB.getLastUpdated(threadID) } + override fun trimThread(threadID: Long, threadLimit: Int) { + val threadDB = DatabaseFactory.getThreadDatabase(context) + threadDB.trimThread(threadID, threadLimit) + } + override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri { return PartAuthority.getAttachmentDataUri(attachmentId) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java index ebb0dbab2..2ba85ca66 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ThreadDatabase.java @@ -423,6 +423,7 @@ public class ThreadDatabase extends Database { DatabaseFactory.getSmsDatabase(context).deleteThread(threadId); DatabaseFactory.getMmsDatabase(context).deleteThread(threadId); DatabaseFactory.getDraftDatabase(context).clearDrafts(threadId); + DatabaseFactory.getLokiMessageDatabase(context).deleteThread(threadId); deleteThread(threadId); notifyConversationListeners(threadId); notifyConversationListListeners(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt index 0e5ccd6fd..9a9a158cf 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/activities/HomeActivity.kt @@ -330,9 +330,6 @@ class HomeActivity : PassphraseRequiredActionBarActivity(), // Delete the conversation val v2OpenGroup = DatabaseFactory.getLokiThreadDatabase(context).getOpenGroupChat(threadID) if (v2OpenGroup != null) { - val apiDB = DatabaseFactory.getLokiAPIDatabase(context) - apiDB.removeLastMessageServerID(v2OpenGroup.room, v2OpenGroup.server) - apiDB.removeLastDeletionServerID(v2OpenGroup.room, v2OpenGroup.server) OpenGroupManager.delete(v2OpenGroup.server, v2OpenGroup.room, this@HomeActivity) } else { ThreadUtils.queue { diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt index b20de78bd..a1b2b208f 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt @@ -89,7 +89,7 @@ object OpenGroupManager { val openGroup = OpenGroupV2(server, room, info.name, publicKey) threadDB.setOpenGroupChat(openGroup, threadID) // Start the poller if needed - if (pollers[server] == null) { + pollers[server]?.startIfNeeded() ?: run { val poller = OpenGroupPollerV2(server, executorService) Util.runOnMain { poller.startIfNeeded() } pollers[server] = poller @@ -111,9 +111,11 @@ object OpenGroupManager { pollers.remove(server) } // Delete + storage.removeLastDeletionServerID(room, server) + storage.removeLastMessageServerID(room, server) + val lokiThreadDB = DatabaseFactory.getLokiThreadDatabase(context) + lokiThreadDB.removeOpenGroupChat(threadID) ThreadUtils.queue { - storage.removeLastDeletionServerID(room, server) - storage.removeLastMessageServerID(room, server) GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt index 83e64f884..137579f07 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiMessageDatabase.kt @@ -8,6 +8,7 @@ import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.loki.utilities.* import org.session.libsignal.database.LokiMessageDatabaseProtocol +import org.session.libsignal.utilities.Log class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), LokiMessageDatabaseProtocol { @@ -131,4 +132,31 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab contentValues.put(Companion.errorMessage, errorMessage) database.insertOrUpdate(errorMessageTable, contentValues, "${Companion.messageID} = ?", arrayOf(messageID.toString())) } + + fun deleteThread(threadId: Long) { + val database = databaseHelper.writableDatabase + try { + val messages = mutableSetOf>() + database.get(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString())) { cursor -> + // for each add + while (cursor.moveToNext()) { + messages.add(cursor.getLong(Companion.messageID) to cursor.getLong(Companion.serverID)) + } + } + Log.d("Test", "Need to delete ${messages.size} number of messages") + + var deletedCount = 0L + + database.beginTransaction() + messages.forEach { (messageId, serverId) -> + deletedCount += database.delete(messageIDTable, "${Companion.messageID} = ? AND ${Companion.serverID} = ?", arrayOf(messageId.toString(), serverId.toString())) + } + Log.d("Test", "Deleted $deletedCount from messageIDTable") + val mappingDeleted = database.delete(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString())) + Log.d("Test", "Deleted $mappingDeleted from mapping table") + database.setTransactionSuccessful() + } finally { + database.endTransaction() + } + } } \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt index b0e1cfe99..f0f6123b2 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/LokiThreadDatabase.kt @@ -73,4 +73,12 @@ class LokiThreadDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa contentValues.put(publicChat, JsonUtil.toJson(openGroupV2.toJson())) database.insertOrUpdate(publicChatTable, contentValues, "${Companion.threadID} = ?", arrayOf(threadID.toString())) } + + fun removeOpenGroupChat(threadID: Long) { + if (threadID < 0) return + + val database = databaseHelper.writableDatabase + database.delete(publicChatTable,"${Companion.threadID} = ?", arrayOf(threadID.toString())) + } + } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 00b856d80..ea5931f3b 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -130,6 +130,7 @@ interface StorageProtocol { fun getThreadId(recipient: Recipient): Long? fun getThreadIdForMms(mmsId: Long): Long fun getLastUpdated(threadID: Long): Long + fun trimThread(threadID: Long, threadLimit: Int) // Contacts fun getContactWithSessionID(sessionID: String): Contact? diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 147a9f107..e6803e89d 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -37,9 +37,9 @@ class JobQueue : JobDelegate { init { // Process jobs scope.launch { - val rxQueue = Channel(capacity = 1024) - val txQueue = Channel(capacity = 1024) - val attachmentQueue = Channel(capacity = 1024) + val rxQueue = Channel(capacity = 4096) + val txQueue = Channel(capacity = 4096) + val attachmentQueue = Channel(capacity = 4096) val receiveJob = processWithDispatcher(rxQueue, rxDispatcher) val txJob = processWithDispatcher(txQueue, txDispatcher) @@ -50,7 +50,7 @@ class JobQueue : JobDelegate { when (job) { is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job) is AttachmentDownloadJob -> attachmentQueue.send(job) - is MessageReceiveJob -> rxQueue.send(job) + is MessageReceiveJob, is TrimThreadJob -> rxQueue.send(job) else -> throw IllegalStateException("Unexpected job type.") } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt index c681a67f3..15526981c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt @@ -10,7 +10,8 @@ class SessionJobManagerFactories { AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(), MessageReceiveJob.KEY to MessageReceiveJob.Factory(), MessageSendJob.KEY to MessageSendJob.Factory(), - NotifyPNServerJob.KEY to NotifyPNServerJob.Factory() + NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(), + TrimThreadJob.KEY to TrimThreadJob.Factory() ) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt new file mode 100644 index 000000000..1122e7329 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/TrimThreadJob.kt @@ -0,0 +1,47 @@ +package org.session.libsession.messaging.jobs + +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.utilities.Data +import org.session.libsession.utilities.TextSecurePreferences + +class TrimThreadJob(val threadId: Long) : Job { + companion object { + const val KEY: String = "TrimThreadJob" + + const val THREAD_ID = "thread_id" + } + + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + + override val maxFailureCount: Int = 1 + + override fun execute() { + val context = MessagingModuleConfiguration.shared.context + val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context) + val threadLengthLimit = TextSecurePreferences.getThreadTrimLength(context) + + if (trimmingEnabled) { + MessagingModuleConfiguration.shared.storage.trimThread(threadId, threadLengthLimit) + } + + delegate?.handleJobSucceeded(this) + } + + override fun serialize(): Data { + return Data.Builder() + .putLong(THREAD_ID, threadId) + .build() + } + + override fun getFactoryKey(): String = "TrimThreadJob" + + class Factory : Job.Factory { + + override fun create(data: Data): TrimThreadJob { + return TrimThreadJob(data.getLong(THREAD_ID)) + } + } + +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt index d6f9aa6c6..a1bcfe441 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/open_groups/OpenGroupAPIV2.kt @@ -258,9 +258,6 @@ object OpenGroupAPIV2 { } private fun parseMessages(room: String, server: String, rawMessages: List>): List { - val storage = MessagingModuleConfiguration.shared.storage - val lastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0 - var currentLastMessageServerID = lastMessageServerID val messages = rawMessages.mapNotNull { json -> json as Map try { @@ -275,15 +272,11 @@ object OpenGroupAPIV2 { Log.d("Loki", "Ignoring message with invalid signature.") return@mapNotNull null } - if (message.serverID > lastMessageServerID) { - currentLastMessageServerID = message.serverID - } message } catch (e: Exception) { null } } - storage.setLastMessageServerID(room, server, currentLastMessageServerID) return messages } // endregion @@ -404,11 +397,6 @@ object OpenGroupAPIV2 { val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java) val idsAsString = JsonUtil.toJson(json["deletions"]) val deletedServerIDs = JsonUtil.fromJson>(idsAsString, type) ?: throw Error.ParsingFailed - val lastDeletionServerID = storage.getLastDeletionServerID(roomID, server) ?: 0 - val serverID = deletedServerIDs.maxByOrNull { it.id } ?: MessageDeletion.empty - if (serverID.id > lastDeletionServerID) { - storage.setLastDeletionServerID(roomID, server, serverID.id) - } // Messages val rawMessages = json["messages"] as? List> ?: return@mapNotNull null val messages = parseMessages(roomID, server, rawMessages) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt index 4b9c9a707..57ec540da 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -5,6 +5,7 @@ import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.jobs.TrimThreadJob import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 import org.session.libsession.utilities.Address @@ -15,6 +16,7 @@ import org.session.libsignal.utilities.successBackground import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +import kotlin.math.max class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) { var hasStarted = false @@ -44,8 +46,8 @@ class OpenGroupPollerV2(private val server: String, private val executorService: return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses -> responses.forEach { (room, response) -> val openGroupID = "$server.$room" - handleNewMessages(openGroupID, response.messages, isBackgroundPoll) - handleDeletedMessages(openGroupID, response.deletions) + handleNewMessages(room, openGroupID, response.messages, isBackgroundPoll) + handleDeletedMessages(room, openGroupID, response.deletions) if (secondToLastJob == null && !isCaughtUp) { isCaughtUp = true } @@ -55,8 +57,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService: }.map { } } - private fun handleNewMessages(openGroupID: String, messages: List, isBackgroundPoll: Boolean) { - if (!hasStarted) { return } + private fun handleNewMessages(room: String, openGroupID: String, messages: List, isBackgroundPoll: Boolean) { + val storage = MessagingModuleConfiguration.shared.storage + val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) + // check thread still exists + val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1 + val threadExists = threadId >= 0 + if (!hasStarted || !threadExists) { return } var latestJob: MessageReceiveJob? = null messages.sortedBy { it.serverID!! }.forEach { message -> try { @@ -82,9 +89,15 @@ class OpenGroupPollerV2(private val server: String, private val executorService: Log.e("Loki", "Exception parsing message", e) } } + val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0 + val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID) + if (actualMax > 0) { + storage.setLastMessageServerID(room, server, actualMax) + } + JobQueue.shared.add(TrimThreadJob(threadId)) } - private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List) { + private fun handleDeletedMessages(room: String, openGroupID: String, deletedMessageServerIDs: List) { val storage = MessagingModuleConfiguration.shared.storage val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) @@ -99,5 +112,10 @@ class OpenGroupPollerV2(private val server: String, private val executorService: deletedMessageIDs.forEach { (messageId, isSms) -> MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) } + val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L + val latestMax = deletedMessageServerIDs.maxOrNull() ?: 0L + if (latestMax > currentMax && latestMax != 0L) { + storage.setLastDeletionServerID(room, server, latestMax) + } } } \ No newline at end of file