session-ios/SessionMessagingKit/Jobs/JobQueue.swift

114 lines
4.7 KiB
Swift
Raw Permalink Normal View History

2020-11-09 00:58:47 +01:00
import SessionUtilitiesKit
2020-11-08 03:12:38 +01:00
2020-11-10 05:48:47 +01:00
@objc(SNJobQueue)
public final class JobQueue : NSObject, JobDelegate {
2020-11-08 03:12:38 +01:00
2021-04-09 01:47:02 +02:00
private static var jobIDs: [UInt64:UInt64] = [:]
internal static var currentlyExecutingJobs: Set<String> = []
2021-04-09 01:47:02 +02:00
2020-11-10 05:48:47 +01:00
@objc public static let shared = JobQueue()
2020-11-08 03:12:38 +01:00
2020-11-10 05:48:47 +01:00
@objc public func add(_ job: Job, using transaction: Any) {
2020-11-24 05:36:03 +01:00
let transaction = transaction as! YapDatabaseReadWriteTransaction
2020-11-20 04:04:56 +01:00
addWithoutExecuting(job, using: transaction)
2020-11-26 23:07:24 +01:00
transaction.addCompletionQueue(Threading.jobQueue) {
2020-11-24 05:36:03 +01:00
job.execute()
}
2020-11-20 04:04:56 +01:00
}
@objc public func addWithoutExecuting(_ job: Job, using transaction: Any) {
2021-04-09 01:47:02 +02:00
let timestamp = NSDate.millisecondTimestamp()
let count = JobQueue.jobIDs[timestamp] ?? 0
// When adding multiple jobs in rapid succession, timestamps might not be good enough as a unique ID. To
// deal with this we keep track of the number of jobs with a given timestamp and that to the end of the
// timestamp to make it a unique ID. We can't use a random number because we do still want to keep track
// of the order in which the jobs were added.
let id = String(timestamp) + String(count)
job.id = id
JobQueue.jobIDs[timestamp] = count + 1
2020-12-02 06:25:16 +01:00
SNMessagingKitConfiguration.shared.storage.persist(job, using: transaction)
2020-11-08 03:12:38 +01:00
job.delegate = self
}
@objc public func resumePendingJobs() {
let allJobTypes: [Job.Type] = [ AttachmentDownloadJob.self, AttachmentUploadJob.self, MessageReceiveJob.self, MessageSendJob.self, NotifyPNServerJob.self ]
allJobTypes.forEach { type in
2020-12-02 06:25:16 +01:00
let allPendingJobs = SNMessagingKitConfiguration.shared.storage.getAllPendingJobs(of: type)
2020-11-26 23:27:20 +01:00
allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { job in // Retry the oldest jobs first
guard !JobQueue.currentlyExecutingJobs.contains(job.id!) else {
return SNLog("Not resuming already executing job.")
}
2020-11-26 23:27:20 +01:00
SNLog("Resuming pending job of type: \(type).")
job.delegate = self
job.execute()
}
}
}
2020-11-08 03:12:38 +01:00
public func handleJobSucceeded(_ job: Job) {
2021-07-12 01:19:59 +02:00
given(job.id) { JobQueue.currentlyExecutingJobs.remove($0) }
2020-12-07 06:00:21 +01:00
SNMessagingKitConfiguration.shared.storage.write(with: { transaction in
2020-12-02 06:25:16 +01:00
SNMessagingKitConfiguration.shared.storage.markJobAsSucceeded(job, using: transaction)
2020-11-08 03:54:40 +01:00
}, completion: {
// Do nothing
})
2020-11-08 03:12:38 +01:00
}
public func handleJobFailed(_ job: Job, with error: Error) {
2021-07-12 01:19:59 +02:00
given(job.id) { JobQueue.currentlyExecutingJobs.remove($0) }
2020-11-08 03:54:40 +01:00
job.failureCount += 1
2020-12-02 06:25:16 +01:00
let storage = SNMessagingKitConfiguration.shared.storage
guard !storage.isJobCanceled(job) else { return SNLog("\(type(of: job)) canceled.") }
2020-12-07 06:00:21 +01:00
storage.write(with: { transaction in
2020-11-08 03:54:40 +01:00
storage.persist(job, using: transaction)
}, completion: { // Intentionally capture self
if job.failureCount == type(of: job).maxFailureCount {
2020-12-07 06:00:21 +01:00
storage.write(with: { transaction in
2020-11-08 03:54:40 +01:00
storage.markJobAsFailed(job, using: transaction)
}, completion: {
// Do nothing
})
} else {
let retryInterval = self.getRetryInterval(for: job)
SNLog("\(type(of: job)) failed; scheduling retry (failure count is \(job.failureCount)).")
2020-11-26 23:07:24 +01:00
Timer.scheduledTimer(timeInterval: retryInterval, target: self, selector: #selector(self.retry(_:)), userInfo: job, repeats: false)
2020-11-08 03:54:40 +01:00
}
})
}
2020-11-18 05:36:51 +01:00
public func handleJobFailedPermanently(_ job: Job, with error: Error) {
2021-07-12 01:19:59 +02:00
given(job.id) { JobQueue.currentlyExecutingJobs.remove($0) }
2020-11-18 05:36:51 +01:00
job.failureCount += 1
2020-12-02 06:25:16 +01:00
let storage = SNMessagingKitConfiguration.shared.storage
2020-12-07 06:00:21 +01:00
storage.write(with: { transaction in
2020-11-18 05:36:51 +01:00
storage.persist(job, using: transaction)
}, completion: { // Intentionally capture self
2020-12-07 06:00:21 +01:00
storage.write(with: { transaction in
2020-11-18 05:36:51 +01:00
storage.markJobAsFailed(job, using: transaction)
}, completion: {
// Do nothing
})
})
}
2020-11-08 03:54:40 +01:00
private func getRetryInterval(for job: Job) -> TimeInterval {
// Arbitrary backoff factor...
2021-02-09 23:51:28 +01:00
// try 1 delay: 0.5s
// try 2 delay: 1s
2020-11-08 03:54:40 +01:00
// ...
2021-02-09 23:51:28 +01:00
// try 5 delay: 16s
2020-11-08 03:54:40 +01:00
// ...
2021-02-09 23:51:28 +01:00
// try 11 delay: 512s
let maxBackoff: Double = 10 * 60 // 10 minutes
return 0.25 * min(maxBackoff, pow(2, Double(job.failureCount)))
2020-11-08 03:54:40 +01:00
}
2020-11-26 23:27:20 +01:00
@objc private func retry(_ timer: Timer) {
guard let job = timer.userInfo as? Job else { return }
SNLog("Retrying \(type(of: job)).")
2020-12-01 01:40:37 +01:00
job.delegate = self
2020-11-08 03:54:40 +01:00
job.execute()
2020-11-08 03:12:38 +01:00
}
}