2021-04-13 09:17:16 +02:00
|
|
|
package org.session.libsession.messaging.sending_receiving.pollers
|
|
|
|
|
|
|
|
import nl.komponents.kovenant.Promise
|
|
|
|
import nl.komponents.kovenant.deferred
|
|
|
|
import org.session.libsession.messaging.MessagingConfiguration
|
|
|
|
import org.session.libsession.messaging.jobs.JobQueue
|
|
|
|
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
2021-04-20 09:22:36 +02:00
|
|
|
import org.session.libsession.messaging.opengroups.OpenGroupAPIV2
|
|
|
|
import org.session.libsession.messaging.opengroups.OpenGroupV2
|
2021-04-13 09:17:16 +02:00
|
|
|
import org.session.libsignal.service.internal.push.SignalServiceProtos
|
|
|
|
import org.session.libsignal.utilities.logging.Log
|
|
|
|
import org.session.libsignal.utilities.successBackground
|
|
|
|
import java.util.concurrent.ScheduledExecutorService
|
|
|
|
import java.util.concurrent.ScheduledFuture
|
|
|
|
import java.util.concurrent.TimeUnit
|
|
|
|
|
|
|
|
class OpenGroupV2Poller(private val openGroup: OpenGroupV2, private val executorService: ScheduledExecutorService? = null) {
|
|
|
|
|
|
|
|
private var hasStarted = false
|
|
|
|
@Volatile private var isPollOngoing = false
|
|
|
|
var isCaughtUp = false
|
|
|
|
|
|
|
|
private val cancellableFutures = mutableListOf<ScheduledFuture<out Any>>()
|
|
|
|
|
|
|
|
// region Settings
|
|
|
|
companion object {
|
|
|
|
private val pollForNewMessagesInterval: Long = 10 * 1000
|
|
|
|
private val pollForDeletedMessagesInterval: Long = 60 * 1000
|
|
|
|
private val pollForModeratorsInterval: Long = 10 * 60 * 1000
|
|
|
|
}
|
|
|
|
// endregion
|
|
|
|
|
|
|
|
// region Lifecycle
|
|
|
|
fun startIfNeeded() {
|
|
|
|
if (hasStarted || executorService == null) return
|
|
|
|
cancellableFutures += listOf(
|
|
|
|
executorService.scheduleAtFixedRate(::pollForNewMessages,0, pollForNewMessagesInterval, TimeUnit.MILLISECONDS),
|
|
|
|
executorService.scheduleAtFixedRate(::pollForDeletedMessages,0, pollForDeletedMessagesInterval, TimeUnit.MILLISECONDS),
|
|
|
|
executorService.scheduleAtFixedRate(::pollForModerators,0, pollForModeratorsInterval, TimeUnit.MILLISECONDS),
|
|
|
|
)
|
|
|
|
hasStarted = true
|
|
|
|
}
|
|
|
|
|
|
|
|
fun stop() {
|
|
|
|
cancellableFutures.forEach { future ->
|
|
|
|
future.cancel(false)
|
|
|
|
}
|
|
|
|
cancellableFutures.clear()
|
|
|
|
hasStarted = false
|
|
|
|
}
|
|
|
|
// endregion
|
|
|
|
|
|
|
|
// region Polling
|
|
|
|
fun pollForNewMessages(): Promise<Unit, Exception> {
|
|
|
|
return pollForNewMessages(false)
|
|
|
|
}
|
|
|
|
|
|
|
|
private fun pollForNewMessages(isBackgroundPoll: Boolean): Promise<Unit, Exception> {
|
|
|
|
if (isPollOngoing) { return Promise.of(Unit) }
|
|
|
|
isPollOngoing = true
|
|
|
|
val deferred = deferred<Unit, Exception>()
|
|
|
|
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
|
|
|
|
OpenGroupAPIV2.getMessages(openGroup.room, openGroup.server).successBackground { messages ->
|
|
|
|
// Process messages in the background
|
|
|
|
Log.d("Loki", "received ${messages.size} messages")
|
|
|
|
messages.forEach { message ->
|
|
|
|
try {
|
2021-04-20 09:22:36 +02:00
|
|
|
val senderPublicKey = message.sender!!
|
2021-04-13 09:17:16 +02:00
|
|
|
// Main message
|
2021-04-20 09:22:36 +02:00
|
|
|
val dataMessageProto = message.toProto()
|
2021-04-13 09:17:16 +02:00
|
|
|
// Content
|
|
|
|
val content = SignalServiceProtos.Content.newBuilder()
|
2021-04-20 09:22:36 +02:00
|
|
|
content.dataMessage = dataMessageProto
|
2021-04-13 09:17:16 +02:00
|
|
|
// Envelope
|
|
|
|
val builder = SignalServiceProtos.Envelope.newBuilder()
|
|
|
|
builder.type = SignalServiceProtos.Envelope.Type.UNIDENTIFIED_SENDER
|
|
|
|
builder.source = senderPublicKey
|
|
|
|
builder.sourceDevice = 1
|
2021-04-20 09:22:36 +02:00
|
|
|
builder.content = content.build().toByteString()
|
|
|
|
builder.timestamp = message.sentTimestamp
|
2021-04-13 09:17:16 +02:00
|
|
|
val envelope = builder.build()
|
2021-04-20 09:22:36 +02:00
|
|
|
val job = MessageReceiveJob(envelope.toByteArray(), isBackgroundPoll, message.serverID, openGroup.id)
|
2021-04-13 09:17:16 +02:00
|
|
|
Log.d("Loki", "Scheduling Job $job")
|
|
|
|
if (isBackgroundPoll) {
|
|
|
|
job.executeAsync().always { deferred.resolve(Unit) }
|
|
|
|
// The promise is just used to keep track of when we're done
|
|
|
|
} else {
|
|
|
|
JobQueue.shared.add(job)
|
|
|
|
}
|
|
|
|
} catch (e: Exception) {
|
|
|
|
Log.e("Loki", "Exception parsing message", e)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
isCaughtUp = true
|
|
|
|
isPollOngoing = false
|
|
|
|
deferred.resolve(Unit)
|
|
|
|
}.fail {
|
2021-04-20 09:22:36 +02:00
|
|
|
Log.e("Loki", "Failed to get messages for group chat with room: ${openGroup.room} on server: ${openGroup.server}.", it)
|
2021-04-13 09:17:16 +02:00
|
|
|
isPollOngoing = false
|
|
|
|
}
|
|
|
|
return deferred.promise
|
|
|
|
}
|
|
|
|
|
|
|
|
private fun pollForDeletedMessages() {
|
2021-04-20 09:22:36 +02:00
|
|
|
OpenGroupAPIV2.getDeletedMessages(openGroup.room, openGroup.server).success { deletedMessageServerIDs ->
|
2021-04-13 09:17:16 +02:00
|
|
|
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { MessagingConfiguration.shared.messageDataProvider.getMessageID(it) }
|
|
|
|
deletedMessageIDs.forEach {
|
|
|
|
MessagingConfiguration.shared.messageDataProvider.deleteMessage(it)
|
|
|
|
}
|
|
|
|
}.fail {
|
2021-04-20 09:22:36 +02:00
|
|
|
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${openGroup.room} on server: ${openGroup.server}.")
|
2021-04-13 09:17:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private fun pollForModerators() {
|
2021-04-20 09:22:36 +02:00
|
|
|
OpenGroupAPIV2.getModerators(openGroup.room, openGroup.server)
|
2021-04-13 09:17:16 +02:00
|
|
|
}
|
|
|
|
// endregion
|
|
|
|
}
|