session-android/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPollerV2.kt

116 lines
4.8 KiB
Kotlin
Raw Normal View History

2021-05-20 07:10:42 +02:00
package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.Promise
2021-05-21 02:24:48 +02:00
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
2021-05-20 07:10:42 +02:00
import org.session.libsession.messaging.MessagingModuleConfiguration
2021-05-21 02:24:48 +02:00
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.snode.SnodeAPI
2021-05-21 03:09:03 +02:00
import org.session.libsession.utilities.GroupUtil
2021-05-21 02:24:48 +02:00
import org.session.libsignal.crypto.getRandomElementOrNull
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.successBackground
2021-05-21 03:09:03 +02:00
import java.util.*
2021-05-21 02:24:48 +02:00
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.min
2021-05-20 07:10:42 +02:00
class ClosedGroupPollerV2 {
2021-05-21 02:24:48 +02:00
private val executorService = Executors.newScheduledThreadPool(4)
2021-05-20 07:10:42 +02:00
private var isPolling = mutableMapOf<String, Boolean>()
2021-05-21 02:24:48 +02:00
private var futures = mutableMapOf<String, ScheduledFuture<*>>()
2021-05-20 07:10:42 +02:00
private fun isPolling(groupPublicKey: String): Boolean {
return isPolling[groupPublicKey] ?: false
}
companion object {
private val minPollInterval = 4 * 1000
private val maxPollInterval = 4 * 60 * 1000
2021-05-20 07:10:42 +02:00
2021-05-21 02:30:02 +02:00
@JvmStatic
2021-05-20 07:10:42 +02:00
val shared = ClosedGroupPollerV2()
}
class InsufficientSnodesException() : Exception("No snodes left to poll.")
class PollingCanceledException() : Exception("Polling canceled.")
fun start() {
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.forEach { startPolling(it) }
}
fun startPolling(groupPublicKey: String) {
if (isPolling(groupPublicKey)) { return }
isPolling[groupPublicKey] = true
2021-05-25 02:56:29 +02:00
setUpPolling(groupPublicKey)
2021-05-20 07:10:42 +02:00
}
fun stop() {
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.forEach { stopPolling(it) }
}
fun stopPolling(groupPublicKey: String) {
2021-05-21 02:24:48 +02:00
futures[groupPublicKey]?.cancel(false)
2021-05-20 07:10:42 +02:00
isPolling[groupPublicKey] = false
}
private fun setUpPolling(groupPublicKey: String) {
poll(groupPublicKey).success {
pollRecursively(groupPublicKey)
}.fail {
// The error is logged in poll(_:)
pollRecursively(groupPublicKey)
}
}
private fun pollRecursively(groupPublicKey: String) {
if (!isPolling(groupPublicKey)) { return }
// Get the received date of the last message in the thread. If we don't have any messages yet, pick some
// reasonable fake time interval to use instead.
2021-05-21 03:09:03 +02:00
val storage = MessagingModuleConfiguration.shared.storage
val groupID = GroupUtil.doubleEncodeGroupID(groupPublicKey)
2021-05-21 07:02:34 +02:00
val threadID = storage.getThreadId(groupID) ?: return
2021-05-21 03:09:03 +02:00
val lastUpdated = storage.getLastUpdated(threadID)
val timeSinceLastMessage = if (lastUpdated != -1L) Date().time - lastUpdated else 5 * 60 * 1000
2021-05-21 02:24:48 +02:00
val minPollInterval = Companion.minPollInterval
2021-05-21 03:09:03 +02:00
val limit: Long = 12 * 60 * 60 * 1000
2021-05-21 02:24:48 +02:00
val a = (Companion.maxPollInterval - minPollInterval).toDouble() / limit.toDouble()
val nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval
Log.d("Loki", "Next poll interval for closed group with public key: $groupPublicKey is ${nextPollInterval / 1000} s.")
executorService?.schedule({
poll(groupPublicKey).success {
pollRecursively(groupPublicKey)
}.fail {
// The error is logged in poll(_:)
pollRecursively(groupPublicKey)
}
}, nextPollInterval.toLong(), TimeUnit.MILLISECONDS)
2021-05-20 07:10:42 +02:00
}
2021-05-21 02:30:02 +02:00
fun poll(groupPublicKey: String): Promise<Unit, Exception> {
2021-05-21 02:24:48 +02:00
if (!isPolling(groupPublicKey)) { return Promise.of(Unit) }
val promise = SnodeAPI.getSwarm(groupPublicKey).bind { swarm ->
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
if (!isPolling(groupPublicKey)) { throw PollingCanceledException() }
SnodeAPI.getRawMessages(snode, groupPublicKey).map { SnodeAPI.parseRawMessagesResponse(it, snode, groupPublicKey) }
}
promise.success { envelopes ->
if (!isPolling(groupPublicKey)) { return@success }
envelopes.forEach { (envelope, serverHash) ->
val job = MessageReceiveJob(envelope.toByteArray(), serverHash)
2021-05-21 02:24:48 +02:00
JobQueue.shared.add(job)
}
}
promise.fail {
Log.d("Loki", "Polling failed for closed group with public key: $groupPublicKey due to error: $it.")
}
return promise.map { }
2021-05-20 07:10:42 +02:00
}
}