Merge pull request #554 from RyanRory/fix-open-group-spam

Fix Open Group Notification Spam Part 2
This commit is contained in:
Niels Andriesse 2021-05-24 16:14:25 +10:00 committed by GitHub
commit ba116df195
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 11 deletions

View File

@ -208,8 +208,7 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
poller.setCaughtUp(false); poller.setCaughtUp(false);
} }
startPollingIfNeeded(); startPollingIfNeeded();
OpenGroupManager.INSTANCE.setAllCaughtUp(false);
OpenGroupManager.INSTANCE.startPolling(); OpenGroupManager.INSTANCE.startPolling();
} }

View File

@ -3,11 +3,8 @@ package org.thoughtcrime.securesms.database
import android.content.Context import android.content.Context
import android.net.Uri import android.net.Uri
import org.session.libsession.database.StorageProtocol import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.jobs.*
import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.signal.* import org.session.libsession.messaging.messages.signal.*
import org.session.libsession.messaging.messages.signal.IncomingTextMessage import org.session.libsession.messaging.messages.signal.IncomingTextMessage
@ -191,6 +188,10 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) return DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID)
} }
override fun getMessageReceivedJob(messageReceiveJobID: String): MessageReceiveJob? {
return DatabaseFactory.getSessionJobDatabase(context).getMessageReceiveJob(messageReceiveJobID)
}
override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) { override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) {
val job = DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) ?: return val job = DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) ?: return
JobQueue.shared.add(job) JobQueue.shared.add(job)

View File

@ -20,7 +20,22 @@ object OpenGroupManager {
private var pollers = mutableMapOf<String, OpenGroupPollerV2>() // One for each server private var pollers = mutableMapOf<String, OpenGroupPollerV2>() // One for each server
private var isPolling = false private var isPolling = false
var isAllCaughtUp = false val isAllCaughtUp: Boolean
get() {
pollers.values.forEach { poller ->
val jobID = poller.secondLastJob?.id
jobID?.let {
val storage = MessagingModuleConfiguration.shared.storage
if (storage.getMessageReceivedJob(jobID) == null) {
// If the second last job is done, it means we are now handling the last job
poller.isCaughtUp = true
poller.secondLastJob = null
}
}
if (!poller.isCaughtUp) { return false }
}
return true
}
fun startPolling() { fun startPolling() {
if (isPolling) { return } if (isPolling) { return }

View File

@ -71,6 +71,13 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
} }
} }
fun getMessageReceiveJob(messageReceiveJobID: String): MessageReceiveJob? {
val database = databaseHelper.readableDatabase
return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf( messageReceiveJobID, MessageReceiveJob.KEY )) { cursor ->
jobFromCursor(cursor) as MessageReceiveJob?
}
}
fun cancelPendingMessageSendJobs(threadID: Long) { fun cancelPendingMessageSendJobs(threadID: Long) {
val database = databaseHelper.writableDatabase val database = databaseHelper.writableDatabase
val attachmentUploadJobKeys = mutableListOf<String>() val attachmentUploadJobKeys = mutableListOf<String>()

View File

@ -287,9 +287,6 @@ public class DefaultMessageNotifier implements MessageNotifier {
} finally { } finally {
if (telcoCursor != null) telcoCursor.close(); if (telcoCursor != null) telcoCursor.close();
if (pushCursor != null) pushCursor.close(); if (pushCursor != null) pushCursor.close();
if (!OpenGroupManager.INSTANCE.isAllCaughtUp()) {
OpenGroupManager.INSTANCE.setAllCaughtUp(true);
}
} }
} }

View File

@ -10,6 +10,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.MessageN
import org.session.libsession.messaging.sending_receiving.pollers.Poller; import org.session.libsession.messaging.sending_receiving.pollers.Poller;
import org.session.libsession.utilities.recipients.Recipient; import org.session.libsession.utilities.recipients.Recipient;
import org.session.libsession.utilities.Debouncer; import org.session.libsession.utilities.Debouncer;
import org.session.libsignal.utilities.Log;
import org.session.libsignal.utilities.ThreadUtils; import org.session.libsignal.utilities.ThreadUtils;
import org.thoughtcrime.securesms.ApplicationContext; import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.loki.api.OpenGroupManager; import org.thoughtcrime.securesms.loki.api.OpenGroupManager;
@ -23,7 +24,7 @@ public class OptimizedMessageNotifier implements MessageNotifier {
@MainThread @MainThread
public OptimizedMessageNotifier(@NonNull MessageNotifier wrapped) { public OptimizedMessageNotifier(@NonNull MessageNotifier wrapped) {
this.wrapped = wrapped; this.wrapped = wrapped;
this.debouncer = new Debouncer(TimeUnit.SECONDS.toMillis(1)); this.debouncer = new Debouncer(TimeUnit.SECONDS.toMillis(2));
} }
@Override @Override
@ -66,6 +67,8 @@ public class OptimizedMessageNotifier implements MessageNotifier {
} }
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp(); isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();
Log.d("Ryan", "Is caught up? " + isCaughtUp);
if (isCaughtUp) { if (isCaughtUp) {
performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId)); performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId));

View File

@ -5,6 +5,7 @@ import android.net.Uri
import org.session.libsession.messaging.contacts.Contact import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.MessageSendJob import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.control.ConfigurationMessage import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.visible.Attachment import org.session.libsession.messaging.messages.visible.Attachment
@ -43,6 +44,7 @@ interface StorageProtocol {
fun getAllPendingJobs(type: String): Map<String,Job?> fun getAllPendingJobs(type: String): Map<String,Job?>
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
fun getMessageReceivedJob(messageReceiveJobID: String): MessageReceiveJob?
fun resumeMessageSendJobIfNeeded(messageSendJobID: String) fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
fun isJobCanceled(job: Job): Boolean fun isJobCanceled(job: Job): Boolean

View File

@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) { class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
var hasStarted = false var hasStarted = false
var isCaughtUp = false
var secondLastJob: MessageReceiveJob? = null
private var future: ScheduledFuture<*>? = null private var future: ScheduledFuture<*>? = null
companion object { companion object {
@ -44,6 +46,9 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
val openGroupID = "$server.$room" val openGroupID = "$server.$room"
handleNewMessages(openGroupID, response.messages, isBackgroundPoll) handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
handleDeletedMessages(openGroupID, response.deletions) handleDeletedMessages(openGroupID, response.deletions)
if (secondLastJob == null && !isCaughtUp) {
isCaughtUp = true
}
} }
}.always { }.always {
executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS) executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS)
@ -52,6 +57,7 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) { private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
if (!hasStarted) { return } if (!hasStarted) { return }
var latestJob: MessageReceiveJob? = null
messages.sortedBy { it.serverID!! }.forEach { message -> messages.sortedBy { it.serverID!! }.forEach { message ->
try { try {
val senderPublicKey = message.sender!! val senderPublicKey = message.sender!!
@ -67,11 +73,16 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
job.executeAsync() job.executeAsync()
} else { } else {
JobQueue.shared.add(job) JobQueue.shared.add(job)
if (!isCaughtUp) {
secondLastJob = latestJob
}
latestJob = job
} }
} catch (e: Exception) { } catch (e: Exception) {
Log.e("Loki", "Exception parsing message", e) Log.e("Loki", "Exception parsing message", e)
} }
} }
Log.d("Ryan", "Finish a round of polling in thread $openGroupID")
} }
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) { private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {