session-android/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt

117 lines
4.4 KiB
Kotlin
Raw Normal View History

2020-12-02 06:39:02 +01:00
package org.session.libsession.messaging.jobs
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
2020-12-10 05:31:38 +01:00
import org.session.libsession.messaging.MessagingConfiguration
2021-02-03 02:22:40 +01:00
import org.session.libsignal.utilities.logging.Log
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
2020-12-02 06:39:02 +01:00
import kotlin.concurrent.schedule
import kotlin.math.min
import kotlin.math.pow
2020-12-02 06:39:02 +01:00
import kotlin.math.roundToLong
2020-11-25 02:06:41 +01:00
class JobQueue : JobDelegate {
2020-12-02 06:39:02 +01:00
private var hasResumedPendingJobs = false // Just for debugging
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = GlobalScope + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
val timer = Timer()
init {
// process jobs
scope.launch(dispatcher) {
while (isActive) {
queue.receive().let { job ->
job.delegate = this@JobQueue
job.execute()
}
}
}
}
2020-12-02 06:39:02 +01:00
companion object {
@JvmStatic
2020-12-02 06:39:02 +01:00
val shared: JobQueue by lazy { JobQueue() }
}
fun add(job: Job) {
addWithoutExecuting(job)
queue.offer(job) // offer always called on unlimited capacity
2020-12-02 06:39:02 +01:00
}
private fun addWithoutExecuting(job: Job) {
// When adding multiple jobs in rapid succession, timestamps might not be good enough as a unique ID. To
// deal with this we keep track of the number of jobs with a given timestamp and that to the end of the
// timestamp to make it a unique ID. We can't use a random number because we do still want to keep track
// of the order in which the jobs were added.
val currentTime = System.currentTimeMillis()
jobTimestampMap.putIfAbsent(currentTime, AtomicInteger())
job.id = currentTime.toString() + jobTimestampMap[currentTime]!!.getAndIncrement().toString()
MessagingConfiguration.shared.storage.persistJob(job)
2020-12-02 06:39:02 +01:00
}
fun resumePendingJobs() {
if (hasResumedPendingJobs) {
Log.d("Loki", "resumePendingJobs() should only be called once.")
return
}
hasResumedPendingJobs = true
val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY)
2020-12-02 06:39:02 +01:00
allJobTypes.forEach { type ->
2020-12-10 05:31:38 +01:00
val allPendingJobs = MessagingConfiguration.shared.storage.getAllPendingJobs(type)
2020-12-02 06:39:02 +01:00
allPendingJobs.sortedBy { it.id }.forEach { job ->
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
queue.offer(job) // offer always called on unlimited capacity
2020-12-02 06:39:02 +01:00
}
}
}
override fun handleJobSucceeded(job: Job) {
2020-12-10 05:31:38 +01:00
MessagingConfiguration.shared.storage.markJobAsSucceeded(job)
2020-12-02 06:39:02 +01:00
}
override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1
2020-12-10 05:31:38 +01:00
val storage = MessagingConfiguration.shared.storage
2020-12-02 06:39:02 +01:00
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
storage.persistJob(job)
2020-12-02 06:39:02 +01:00
if (job.failureCount == job.maxFailureCount) {
storage.markJobAsFailed(job)
} else {
val retryInterval = getRetryInterval(job)
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
timer.schedule(delay = retryInterval) {
2020-12-02 06:39:02 +01:00
Log.i("Jobs", "Retrying ${job::class.simpleName}.")
2021-03-15 23:44:55 +01:00
queue.offer(job)
2020-12-02 06:39:02 +01:00
}
}
}
override fun handleJobFailedPermanently(job: Job, error: Exception) {
job.failureCount += 1
2020-12-10 05:31:38 +01:00
val storage = MessagingConfiguration.shared.storage
storage.persistJob(job)
2020-12-02 06:39:02 +01:00
storage.markJobAsFailed(job)
}
private fun getRetryInterval(job: Job): Long {
// Arbitrary backoff factor...
// try 1 delay: 0.5s
// try 2 delay: 1s
2020-12-02 06:39:02 +01:00
// ...
// try 5 delay: 16s
2020-12-02 06:39:02 +01:00
// ...
// try 11 delay: 512s
val maxBackoff = (10 * 60).toDouble() // 10 minutes
return (1000 * 0.25 * min(maxBackoff, (2.0).pow(job.failureCount))).roundToLong()
2020-12-02 06:39:02 +01:00
}
2020-11-25 02:06:41 +01:00
}