feat: compact polling server-wide

This commit is contained in:
jubb 2021-05-04 14:51:21 +10:00
parent 2eb612a967
commit f3fa8626ed
2 changed files with 15 additions and 8 deletions

View File

@ -21,7 +21,7 @@ class PublicChatManager(private val context: Context) {
private var chats = mutableMapOf<Long, OpenGroup>()
private var v2Chats = mutableMapOf<Long, OpenGroupV2>()
private val pollers = mutableMapOf<Long, OpenGroupPoller>()
private val v2Pollers = mutableMapOf<Long, OpenGroupV2Poller>()
private val v2Pollers = mutableMapOf<String, OpenGroupV2Poller>()
private val observers = mutableMapOf<Long, ContentObserver>()
private var isPolling = false
private val executorService = Executors.newScheduledThreadPool(4)
@ -53,12 +53,17 @@ class PublicChatManager(private val context: Context) {
listenToThreadDeletion(threadId)
if (!pollers.containsKey(threadId)) { pollers[threadId] = poller }
}
for ((threadId, chat) in v2Chats) {
val poller = v2Pollers[threadId] ?: OpenGroupV2Poller(listOf(chat), executorService)
v2Pollers.values.forEach { it.stop() }
v2Pollers.clear()
v2Chats.entries.groupBy { (_, group) -> group.server }.forEach { (server, threadedRooms) ->
val poller = OpenGroupV2Poller(threadedRooms.map { it.value }, executorService)
poller.startIfNeeded()
listenToThreadDeletion(threadId)
if (!v2Pollers.containsKey(threadId)) { v2Pollers[threadId] = poller }
threadedRooms.forEach { (thread, _) ->
listenToThreadDeletion(thread)
}
v2Pollers[server] = poller
}
isPolling = true
}
@ -171,7 +176,8 @@ class PublicChatManager(private val context: Context) {
DatabaseFactory.getLokiThreadDatabase(context).removePublicChat(threadID)
pollers.remove(threadID)?.stop()
v2Pollers.remove(threadID)?.stop()
v2Pollers.values.forEach { it.stop() }
v2Pollers.clear()
observers.remove(threadID)
startPollersIfNeeded()
}

View File

@ -66,7 +66,7 @@ class OpenGroupV2Poller(private val openGroups: List<OpenGroupV2>, private val e
}
fun compactPoll(isBackgroundPoll: Boolean): Promise<Any, Exception> {
if (isPollOngoing) return Promise.of(Unit)
if (isPollOngoing || !hasStarted) return Promise.of(Unit)
isPollOngoing = true
val server = openGroups.first().server // assume all the same server
val rooms = openGroups.map { it.room }
@ -74,7 +74,7 @@ class OpenGroupV2Poller(private val openGroups: List<OpenGroupV2>, private val e
results.forEach { (room, results) ->
val serverRoomId = "$server.$room"
handleDeletedMessages(serverRoomId,results.deletions)
handleNewMessages(serverRoomId, results.messages, isBackgroundPoll)
handleNewMessages(serverRoomId, results.messages.sortedBy { it.serverID }, isBackgroundPoll)
}
}.always {
isPollOngoing = false
@ -86,6 +86,7 @@ class OpenGroupV2Poller(private val openGroups: List<OpenGroupV2>, private val e
}
private fun handleNewMessages(serverRoomId: String, newMessages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
if (!hasStarted) return
newMessages.forEach { message ->
try {
val senderPublicKey = message.sender!!