refactor: trim thread is now queued after batch processing messages, for other conversations still after every persisted message

migrate TrimThreadJob.kt to new job system

deleting more open group references in removal

open group last message / last deletion now sets after processing vs after fetching
This commit is contained in:
jubb 2021-05-26 15:22:19 +10:00
parent 02bbd3b8bf
commit efa36d4cea
14 changed files with 130 additions and 34 deletions

View File

@ -46,7 +46,6 @@ import org.thoughtcrime.securesms.database.model.MediaMmsMessageRecord;
import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.NotificationMmsMessageRecord;
import org.thoughtcrime.securesms.database.model.Quote;
import org.thoughtcrime.securesms.jobs.TrimThreadJob;
import org.session.libsession.messaging.messages.signal.IncomingMediaMessage;
import org.thoughtcrime.securesms.mms.MmsException;
import org.session.libsession.messaging.messages.signal.OutgoingExpirationUpdateMessage;
@ -668,7 +667,6 @@ public class MmsDatabase extends MessagingDatabase {
}
notifyConversationListeners(threadId);
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
return Optional.of(new InsertResult(messageId, threadId));
}
@ -812,7 +810,6 @@ public class MmsDatabase extends MessagingDatabase {
DatabaseFactory.getThreadDatabase(context).setLastSeen(threadId);
DatabaseFactory.getThreadDatabase(context).setHasSent(threadId, true);
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
return messageId;
}

View File

@ -35,7 +35,6 @@ import org.session.libsession.utilities.IdentityKeyMismatchList;
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper;
import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.database.model.SmsMessageRecord;
import org.thoughtcrime.securesms.jobs.TrimThreadJob;
import org.session.libsession.messaging.messages.signal.IncomingGroupMessage;
import org.session.libsession.messaging.messages.signal.IncomingTextMessage;
import org.session.libsession.messaging.messages.signal.OutgoingTextMessage;
@ -414,7 +413,6 @@ public class SmsDatabase extends MessagingDatabase {
notifyConversationListeners(threadId);
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
return Optional.of(new InsertResult(messageId, threadId));
}
@ -484,7 +482,6 @@ public class SmsDatabase extends MessagingDatabase {
notifyConversationListeners(threadId);
ApplicationContext.getInstance(context).getJobManager().add(new TrimThreadJob(threadId));
return messageId;
}

View File

@ -1,5 +1,6 @@
package org.thoughtcrime.securesms.database
import android.app.job.JobScheduler
import android.content.Context
import android.net.Uri
import org.session.libsession.database.StorageProtocol
@ -161,6 +162,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
messageID = result.messageId
}
}
val threadID = message.threadID
if (openGroupID.isNullOrEmpty() && threadID != null && threadID >= 0) {
JobQueue.shared.add(TrimThreadJob(threadID))
// open group trim thread job is scheduled after processing
}
return messageID
}
@ -539,6 +545,11 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return threadDB.getLastUpdated(threadID)
}
override fun trimThread(threadID: Long, threadLimit: Int) {
val threadDB = DatabaseFactory.getThreadDatabase(context)
threadDB.trimThread(threadID, threadLimit)
}
override fun getAttachmentDataUri(attachmentId: AttachmentId): Uri {
return PartAuthority.getAttachmentDataUri(attachmentId)
}

View File

@ -423,6 +423,7 @@ public class ThreadDatabase extends Database {
DatabaseFactory.getSmsDatabase(context).deleteThread(threadId);
DatabaseFactory.getMmsDatabase(context).deleteThread(threadId);
DatabaseFactory.getDraftDatabase(context).clearDrafts(threadId);
DatabaseFactory.getLokiMessageDatabase(context).deleteThread(threadId);
deleteThread(threadId);
notifyConversationListeners(threadId);
notifyConversationListListeners();

View File

@ -330,9 +330,6 @@ class HomeActivity : PassphraseRequiredActionBarActivity(),
// Delete the conversation
val v2OpenGroup = DatabaseFactory.getLokiThreadDatabase(context).getOpenGroupChat(threadID)
if (v2OpenGroup != null) {
val apiDB = DatabaseFactory.getLokiAPIDatabase(context)
apiDB.removeLastMessageServerID(v2OpenGroup.room, v2OpenGroup.server)
apiDB.removeLastDeletionServerID(v2OpenGroup.room, v2OpenGroup.server)
OpenGroupManager.delete(v2OpenGroup.server, v2OpenGroup.room, this@HomeActivity)
} else {
ThreadUtils.queue {

View File

@ -89,7 +89,7 @@ object OpenGroupManager {
val openGroup = OpenGroupV2(server, room, info.name, publicKey)
threadDB.setOpenGroupChat(openGroup, threadID)
// Start the poller if needed
if (pollers[server] == null) {
pollers[server]?.startIfNeeded() ?: run {
val poller = OpenGroupPollerV2(server, executorService)
Util.runOnMain { poller.startIfNeeded() }
pollers[server] = poller
@ -111,9 +111,11 @@ object OpenGroupManager {
pollers.remove(server)
}
// Delete
storage.removeLastDeletionServerID(room, server)
storage.removeLastMessageServerID(room, server)
val lokiThreadDB = DatabaseFactory.getLokiThreadDatabase(context)
lokiThreadDB.removeOpenGroupChat(threadID)
ThreadUtils.queue {
storage.removeLastDeletionServerID(room, server)
storage.removeLastMessageServerID(room, server)
GroupManager.deleteGroup(groupID, context) // Must be invoked on a background thread
}
}

View File

@ -8,6 +8,7 @@ import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.loki.utilities.*
import org.session.libsignal.database.LokiMessageDatabaseProtocol
import org.session.libsignal.utilities.Log
class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), LokiMessageDatabaseProtocol {
@ -131,4 +132,31 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab
contentValues.put(Companion.errorMessage, errorMessage)
database.insertOrUpdate(errorMessageTable, contentValues, "${Companion.messageID} = ?", arrayOf(messageID.toString()))
}
fun deleteThread(threadId: Long) {
val database = databaseHelper.writableDatabase
try {
val messages = mutableSetOf<Pair<Long,Long>>()
database.get(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString())) { cursor ->
// for each add
while (cursor.moveToNext()) {
messages.add(cursor.getLong(Companion.messageID) to cursor.getLong(Companion.serverID))
}
}
Log.d("Test", "Need to delete ${messages.size} number of messages")
var deletedCount = 0L
database.beginTransaction()
messages.forEach { (messageId, serverId) ->
deletedCount += database.delete(messageIDTable, "${Companion.messageID} = ? AND ${Companion.serverID} = ?", arrayOf(messageId.toString(), serverId.toString()))
}
Log.d("Test", "Deleted $deletedCount from messageIDTable")
val mappingDeleted = database.delete(messageThreadMappingTable, "${Companion.threadID} = ?", arrayOf(threadId.toString()))
Log.d("Test", "Deleted $mappingDeleted from mapping table")
database.setTransactionSuccessful()
} finally {
database.endTransaction()
}
}
}

View File

@ -73,4 +73,12 @@ class LokiThreadDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
contentValues.put(publicChat, JsonUtil.toJson(openGroupV2.toJson()))
database.insertOrUpdate(publicChatTable, contentValues, "${Companion.threadID} = ?", arrayOf(threadID.toString()))
}
fun removeOpenGroupChat(threadID: Long) {
if (threadID < 0) return
val database = databaseHelper.writableDatabase
database.delete(publicChatTable,"${Companion.threadID} = ?", arrayOf(threadID.toString()))
}
}

View File

@ -130,6 +130,7 @@ interface StorageProtocol {
fun getThreadId(recipient: Recipient): Long?
fun getThreadIdForMms(mmsId: Long): Long
fun getLastUpdated(threadID: Long): Long
fun trimThread(threadID: Long, threadLimit: Int)
// Contacts
fun getContactWithSessionID(sessionID: String): Contact?

View File

@ -37,9 +37,9 @@ class JobQueue : JobDelegate {
init {
// Process jobs
scope.launch {
val rxQueue = Channel<Job>(capacity = 1024)
val txQueue = Channel<Job>(capacity = 1024)
val attachmentQueue = Channel<Job>(capacity = 1024)
val rxQueue = Channel<Job>(capacity = 4096)
val txQueue = Channel<Job>(capacity = 4096)
val attachmentQueue = Channel<Job>(capacity = 4096)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
val txJob = processWithDispatcher(txQueue, txDispatcher)
@ -50,7 +50,7 @@ class JobQueue : JobDelegate {
when (job) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
is AttachmentDownloadJob -> attachmentQueue.send(job)
is MessageReceiveJob -> rxQueue.send(job)
is MessageReceiveJob, is TrimThreadJob -> rxQueue.send(job)
else -> throw IllegalStateException("Unexpected job type.")
}
}

View File

@ -10,7 +10,8 @@ class SessionJobManagerFactories {
AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(),
MessageReceiveJob.KEY to MessageReceiveJob.Factory(),
MessageSendJob.KEY to MessageSendJob.Factory(),
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory()
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(),
TrimThreadJob.KEY to TrimThreadJob.Factory()
)
}
}

View File

@ -0,0 +1,47 @@
package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.TextSecurePreferences
class TrimThreadJob(val threadId: Long) : Job {
companion object {
const val KEY: String = "TrimThreadJob"
const val THREAD_ID = "thread_id"
}
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override fun execute() {
val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val threadLengthLimit = TextSecurePreferences.getThreadTrimLength(context)
if (trimmingEnabled) {
MessagingModuleConfiguration.shared.storage.trimThread(threadId, threadLengthLimit)
}
delegate?.handleJobSucceeded(this)
}
override fun serialize(): Data {
return Data.Builder()
.putLong(THREAD_ID, threadId)
.build()
}
override fun getFactoryKey(): String = "TrimThreadJob"
class Factory : Job.Factory<TrimThreadJob> {
override fun create(data: Data): TrimThreadJob {
return TrimThreadJob(data.getLong(THREAD_ID))
}
}
}

View File

@ -258,9 +258,6 @@ object OpenGroupAPIV2 {
}
private fun parseMessages(room: String, server: String, rawMessages: List<Map<*, *>>): List<OpenGroupMessageV2> {
val storage = MessagingModuleConfiguration.shared.storage
val lastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
var currentLastMessageServerID = lastMessageServerID
val messages = rawMessages.mapNotNull { json ->
json as Map<String, Any>
try {
@ -275,15 +272,11 @@ object OpenGroupAPIV2 {
Log.d("Loki", "Ignoring message with invalid signature.")
return@mapNotNull null
}
if (message.serverID > lastMessageServerID) {
currentLastMessageServerID = message.serverID
}
message
} catch (e: Exception) {
null
}
}
storage.setLastMessageServerID(room, server, currentLastMessageServerID)
return messages
}
// endregion
@ -404,11 +397,6 @@ object OpenGroupAPIV2 {
val type = TypeFactory.defaultInstance().constructCollectionType(List::class.java, MessageDeletion::class.java)
val idsAsString = JsonUtil.toJson(json["deletions"])
val deletedServerIDs = JsonUtil.fromJson<List<MessageDeletion>>(idsAsString, type) ?: throw Error.ParsingFailed
val lastDeletionServerID = storage.getLastDeletionServerID(roomID, server) ?: 0
val serverID = deletedServerIDs.maxByOrNull { it.id } ?: MessageDeletion.empty
if (serverID.id > lastDeletionServerID) {
storage.setLastDeletionServerID(roomID, server, serverID.id)
}
// Messages
val rawMessages = json["messages"] as? List<Map<String, Any>> ?: return@mapNotNull null
val messages = parseMessages(roomID, server, rawMessages)

View File

@ -5,6 +5,7 @@ import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.TrimThreadJob
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.utilities.Address
@ -15,6 +16,7 @@ import org.session.libsignal.utilities.successBackground
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.max
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
var hasStarted = false
@ -44,8 +46,8 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses ->
responses.forEach { (room, response) ->
val openGroupID = "$server.$room"
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(openGroupID, response.deletions)
handleNewMessages(room, openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(room, openGroupID, response.deletions)
if (secondToLastJob == null && !isCaughtUp) {
isCaughtUp = true
}
@ -55,8 +57,13 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
}.map { }
}
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
if (!hasStarted) { return }
private fun handleNewMessages(room: String, openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
val storage = MessagingModuleConfiguration.shared.storage
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
// check thread still exists
val threadId = storage.getThreadId(Address.fromSerialized(groupID)) ?: -1
val threadExists = threadId >= 0
if (!hasStarted || !threadExists) { return }
var latestJob: MessageReceiveJob? = null
messages.sortedBy { it.serverID!! }.forEach { message ->
try {
@ -82,9 +89,15 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
Log.e("Loki", "Exception parsing message", e)
}
}
val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID)
if (actualMax > 0) {
storage.setLastMessageServerID(room, server, actualMax)
}
JobQueue.shared.add(TrimThreadJob(threadId))
}
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
private fun handleDeletedMessages(room: String, openGroupID: String, deletedMessageServerIDs: List<Long>) {
val storage = MessagingModuleConfiguration.shared.storage
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
@ -99,5 +112,10 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
}
val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L
val latestMax = deletedMessageServerIDs.maxOrNull() ?: 0L
if (latestMax > currentMax && latestMax != 0L) {
storage.setLastDeletionServerID(room, server, latestMax)
}
}
}