diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 87fa307e5..1facc4a68 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -442,7 +442,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO poller.setUserPublicKey(userPublicKey); return; } - poller = new Poller(configFactory, new Timer()); + poller = new Poller(configFactory); } public void startPollingIfNeeded() { diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index a901fd141..1803dd8c8 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -19,12 +19,11 @@ import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.jobs.BatchMessageReceiveJob import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.MessageReceiveParameters -import org.session.libsession.messaging.messages.control.SharedConfigurationMessage -import org.session.libsession.messaging.sending_receiving.MessageReceiver import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeModule import org.session.libsession.utilities.ConfigFactoryProtocol +import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Namespace import org.session.libsignal.utilities.Snode @@ -35,7 +34,7 @@ import kotlin.time.Duration.Companion.days private class PromiseCanceledException : Exception("Promise canceled.") -class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Timer) { +class Poller(private val configFactory: ConfigFactoryProtocol) { var userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: "" private var hasStarted: Boolean = false private val usedSnodes: MutableSet = mutableSetOf() @@ -126,34 +125,26 @@ class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Ti private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfigObject: ConfigBase?) { if (forConfigObject == null) return - val messages = SnodeAPI.parseRawMessagesResponse( - rawMessages, - snode, - userPublicKey, - namespace, - updateLatestHash = true, - updateStoredHashes = true, - ) + val messages = rawMessages["messages"] as? List<*> + val processed = if (messages != null && messages.isNotEmpty()) { + SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace) + SnodeAPI.removeDuplicates(userPublicKey, messages, namespace, true).mapNotNull { messageBody -> + val rawMessageAsJSON = messageBody as? Map<*, *> ?: return@mapNotNull null + val hashValue = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null + val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null + val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset + val body = Base64.decode(b64EncodedBody) + Triple(body, hashValue, timestamp) + } + } else emptyList() - if (messages.isEmpty()) { - // no new messages to process - return - } + if (processed.isEmpty()) return var latestMessageTimestamp: Long? = null - messages.forEach { (envelope, hash) -> + processed.forEach { (body, hash, timestamp) -> try { - val (message, _) = MessageReceiver.parse(data = envelope.toByteArray(), - // assume no groups in personal poller messages - openGroupServerID = null, currentClosedGroups = emptySet() - ) - // sanity checks - if (message !is SharedConfigurationMessage) { - Log.w("Loki", "shared config message handled in configs wasn't SharedConfigurationMessage but was ${message.javaClass.simpleName}") - return@forEach - } - forConfigObject.merge(hash!! to message.data) - latestMessageTimestamp = if ((message.sentTimestamp ?: 0L) > (latestMessageTimestamp ?: 0L)) { message.sentTimestamp } else { latestMessageTimestamp } + forConfigObject.merge(hash to body) + latestMessageTimestamp = if (timestamp > (latestMessageTimestamp ?: 0L)) { timestamp } else { latestMessageTimestamp } } catch (e: Exception) { Log.e("Loki", e) } diff --git a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 42eacf7b2..518b20c15 100644 --- a/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/libsession/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -872,7 +872,7 @@ object SnodeAPI { } } - private fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) { + fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) { val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *> val hashValue = lastMessageAsJSON?.get("hash") as? String if (hashValue != null) { @@ -882,7 +882,7 @@ object SnodeAPI { } } - private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> { + fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> { val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf() val receivedMessageHashValues = originalMessageHashValues.toMutableSet() val result = rawMessages.filter { rawMessage ->