diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index b2b18f0d5..e5c270b4a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -207,7 +207,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc } startPollingIfNeeded(); - OpenGroupManager.INSTANCE.setAllCaughtUp(false); OpenGroupManager.INSTANCE.startPolling(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index d1e620a11..167bd522a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -4,10 +4,7 @@ import android.content.Context import android.net.Uri import okhttp3.HttpUrl import org.session.libsession.database.StorageProtocol -import org.session.libsession.messaging.jobs.AttachmentUploadJob -import org.session.libsession.messaging.jobs.Job -import org.session.libsession.messaging.jobs.JobQueue -import org.session.libsession.messaging.jobs.MessageSendJob +import org.session.libsession.messaging.jobs.* import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.signal.* import org.session.libsession.messaging.messages.signal.IncomingTextMessage @@ -210,6 +207,10 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, return DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) } + override fun getMessageReceivedJob(messageReceiveJobID: String): MessageReceiveJob? { + return DatabaseFactory.getSessionJobDatabase(context).getMessageReceiveJob(messageReceiveJobID) + } + override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) { val job = DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) ?: return JobQueue.shared.add(job) diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt index 20bc37e55..b2f5bd6ef 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/OpenGroupManager.kt @@ -19,7 +19,22 @@ object OpenGroupManager { private var pollers = mutableMapOf() // One for each server private var isPolling = false - var isAllCaughtUp = false + val isAllCaughtUp: Boolean + get() { + pollers.values.forEach { poller -> + val jobID = poller.secondLastJob?.id + jobID?.let { + val storage = MessagingModuleConfiguration.shared.storage + if (storage.getMessageReceivedJob(jobID) == null) { + // If the second last job is done, it means we are now handling the last job + poller.isCaughtUp = true + poller.secondLastJob = null + } + } + if (!poller.isCaughtUp) { return false } + } + return true + } fun startPolling() { if (isPolling) { return } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index 6d070865a..a0ca59dd0 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -71,6 +71,13 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa } } + fun getMessageReceiveJob(messageReceiveJobID: String): MessageReceiveJob? { + val database = databaseHelper.readableDatabase + return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf( messageReceiveJobID, MessageReceiveJob.KEY )) { cursor -> + jobFromCursor(cursor) as MessageReceiveJob? + } + } + fun cancelPendingMessageSendJobs(threadID: Long) { val database = databaseHelper.writableDatabase val attachmentUploadJobKeys = mutableListOf() diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/DefaultMessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/DefaultMessageNotifier.java index 55dd3f17a..532c95315 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/DefaultMessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/DefaultMessageNotifier.java @@ -287,9 +287,6 @@ public class DefaultMessageNotifier implements MessageNotifier { } finally { if (telcoCursor != null) telcoCursor.close(); if (pushCursor != null) pushCursor.close(); - if (!OpenGroupManager.INSTANCE.isAllCaughtUp()) { - OpenGroupManager.INSTANCE.setAllCaughtUp(true); - } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java index 448d977da..d60d40b20 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/OptimizedMessageNotifier.java @@ -10,6 +10,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.MessageN import org.session.libsession.messaging.sending_receiving.pollers.Poller; import org.session.libsession.utilities.recipients.Recipient; import org.session.libsession.utilities.Debouncer; +import org.session.libsignal.utilities.Log; import org.session.libsignal.utilities.ThreadUtils; import org.thoughtcrime.securesms.ApplicationContext; import org.thoughtcrime.securesms.loki.api.OpenGroupManager; @@ -23,7 +24,7 @@ public class OptimizedMessageNotifier implements MessageNotifier { @MainThread public OptimizedMessageNotifier(@NonNull MessageNotifier wrapped) { this.wrapped = wrapped; - this.debouncer = new Debouncer(TimeUnit.SECONDS.toMillis(1)); + this.debouncer = new Debouncer(TimeUnit.SECONDS.toMillis(2)); } @Override @@ -66,6 +67,8 @@ public class OptimizedMessageNotifier implements MessageNotifier { } isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp(); + + Log.d("Ryan", "Is caught up? " + isCaughtUp); if (isCaughtUp) { performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId)); diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index 103448fbb..9b1956890 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -5,6 +5,7 @@ import android.content.Context import android.net.Uri import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.Job +import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageSendJob import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.visible.Attachment @@ -49,6 +50,7 @@ interface StorageProtocol { fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? + fun getMessageReceivedJob(messageReceiveJobID: String): MessageReceiveJob? fun resumeMessageSendJobIfNeeded(messageSendJobID: String) fun isJobCanceled(job: Job): Boolean diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt index 320c3afd9..d3131466a 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) { var hasStarted = false + var isCaughtUp = false + var secondLastJob: MessageReceiveJob? = null private var future: ScheduledFuture<*>? = null companion object { @@ -43,6 +45,9 @@ class OpenGroupPollerV2(private val server: String, private val executorService: val openGroupID = "$server.$room" handleNewMessages(openGroupID, response.messages, isBackgroundPoll) handleDeletedMessages(openGroupID, response.deletions) + if (secondLastJob == null && !isCaughtUp) { + isCaughtUp = true + } } }.always { executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS) @@ -51,6 +56,7 @@ class OpenGroupPollerV2(private val server: String, private val executorService: private fun handleNewMessages(openGroupID: String, messages: List, isBackgroundPoll: Boolean) { if (!hasStarted) { return } + var latestJob: MessageReceiveJob? = null messages.sortedBy { it.serverID!! }.forEach { message -> try { val senderPublicKey = message.sender!! @@ -66,11 +72,16 @@ class OpenGroupPollerV2(private val server: String, private val executorService: job.executeAsync() } else { JobQueue.shared.add(job) + if (!isCaughtUp) { + secondLastJob = latestJob + } + latestJob = job } } catch (e: Exception) { Log.e("Loki", "Exception parsing message", e) } } + Log.d("Ryan", "Finish a round of polling in thread $openGroupID") } private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List) {