session-ios/SignalUtilitiesKit/JobQueue.swift

304 lines
11 KiB
Swift

//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
//
import Foundation
/// JobQueue - A durable work queue
///
/// When work needs to be done, add it to the JobQueue.
/// The JobQueue will persist a JobRecord to be sure that work can be restarted if the app is killed.
///
/// The actual work, is carried out in a DurableOperation which the JobQueue spins off, based on the contents
/// of a JobRecord.
///
/// For a concrete example, take message sending.
/// Add an outgoing message to the MessageSenderJobQueue, which first records a SSKMessageSenderJobRecord.
/// The MessageSenderJobQueue then uses that SSKMessageSenderJobRecord to create a MessageSenderOperation which
/// takes care of the actual business of communicating with the service.
///
/// DurableOperations are retryable - via their `remainingRetries` logic. However, if the operation encounters
/// an error where `error.isRetryable == false`, the operation will fail, regardless of available retries.
public extension Error {
var isRetryable: Bool {
return (self as NSError).isRetryable
}
}
extension SSKJobRecordStatus: CustomStringConvertible {
public var description: String {
switch self {
case .ready:
return "ready"
case .unknown:
return "unknown"
case .running:
return "running"
case .permanentlyFailed:
return "permanentlyFailed"
case .obsolete:
return "obsolete"
}
}
}
public enum JobError: Error {
case assertionFailure(description: String)
case obsolete(description: String)
}
public protocol DurableOperation: class {
associatedtype JobRecordType: SSKJobRecord
associatedtype DurableOperationDelegateType: DurableOperationDelegate
var jobRecord: JobRecordType { get }
var durableOperationDelegate: DurableOperationDelegateType? { get set }
var operation: OWSOperation { get }
var remainingRetries: UInt { get set }
}
public protocol DurableOperationDelegate: class {
associatedtype DurableOperationType: DurableOperation
func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction)
func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction)
func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction)
}
public protocol JobQueue: DurableOperationDelegate {
typealias DurableOperationDelegateType = Self
typealias JobRecordType = DurableOperationType.JobRecordType
// MARK: Dependencies
var dbConnection: YapDatabaseConnection { get }
var finder: JobRecordFinder { get }
// MARK: Default Implementations
func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction)
func restartOldJobs()
func workStep()
func defaultSetup()
// MARK: Required
var runningOperations: [DurableOperationType] { get set }
var jobRecordLabel: String { get }
var isSetup: Bool { get set }
func setup()
func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction)
func operationQueue(jobRecord: JobRecordType) -> OperationQueue
func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType
/// When `requiresInternet` is true, we immediately run any jobs which are waiting for retry upon detecting Reachability.
///
/// Because `Reachability` isn't 100% reliable, the jobs will be attempted regardless of what we think our current Reachability is.
/// However, because these jobs will likely fail many times in succession, their `retryInterval` could be quite long by the time we
/// are back online.
var requiresInternet: Bool { get }
static var maxRetries: UInt { get }
}
public extension JobQueue {
// MARK: Dependencies
var dbConnection: YapDatabaseConnection {
return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection
}
var finder: JobRecordFinder {
return JobRecordFinder()
}
var reachabilityManager: SSKReachabilityManager {
return SSKEnvironment.shared.reachabilityManager
}
// MARK:
func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) {
assert(jobRecord.status == .ready)
jobRecord.save(with: transaction)
transaction.addCompletionQueue(DispatchQueue.global()) {
self.startWorkWhenAppIsReady()
}
}
func startWorkWhenAppIsReady() {
guard !CurrentAppContext().isRunningTests else {
DispatchQueue.global().async {
self.workStep()
}
return
}
AppReadiness.runNowOrWhenAppDidBecomeReady {
DispatchQueue.global().async {
self.workStep()
}
}
}
func workStep() {
Logger.debug("")
guard isSetup else {
if !CurrentAppContext().isRunningTests {
owsFailDebug("not setup")
}
return
}
Storage.writeSync { transaction in
guard let nextJob: JobRecordType = self.finder.getNextReady(label: self.jobRecordLabel, transaction: transaction) as? JobRecordType else {
Logger.verbose("nothing left to enqueue")
return
}
do {
try nextJob.saveAsStarted(transaction: transaction)
let operationQueue = self.operationQueue(jobRecord: nextJob)
let durableOperation = try self.buildOperation(jobRecord: nextJob, transaction: transaction)
durableOperation.durableOperationDelegate = self as? Self.DurableOperationType.DurableOperationDelegateType
assert(durableOperation.durableOperationDelegate != nil)
let remainingRetries = self.remainingRetries(durableOperation: durableOperation)
durableOperation.remainingRetries = remainingRetries
self.runningOperations.append(durableOperation)
Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)")
operationQueue.addOperation(durableOperation.operation)
} catch JobError.assertionFailure(let description) {
owsFailDebug("assertion failure: \(description)")
nextJob.saveAsPermanentlyFailed(transaction: transaction)
} catch JobError.obsolete(let description) {
// TODO is this even worthwhile to have obsolete state? Should we just delete the task outright?
Logger.verbose("marking obsolete task as such. description:\(description)")
nextJob.saveAsObsolete(transaction: transaction)
} catch {
owsFailDebug("unexpected error")
}
DispatchQueue.global().async {
self.workStep()
}
}
}
public func restartOldJobs() {
Storage.writeSync { transaction in
let runningRecords = self.finder.allRecords(label: self.jobRecordLabel, status: .running, transaction: transaction)
Logger.info("marking old `running` JobRecords as ready: \(runningRecords.count)")
for record in runningRecords {
guard let jobRecord = record as? JobRecordType else {
owsFailDebug("unexpectred jobRecord: \(record)")
continue
}
do {
try jobRecord.saveRunningAsReady(transaction: transaction)
self.didMarkAsReady(oldJobRecord: jobRecord, transaction: transaction)
} catch {
owsFailDebug("failed to mark old running records as ready error: \(error)")
jobRecord.saveAsPermanentlyFailed(transaction: transaction)
}
}
}
}
/// Unless you need special handling, your setup method can be as simple as
///
/// func setup() {
/// defaultSetup()
/// }
///
/// So you might ask, why not just rename this method to `setup`? Because
/// `setup` is called from objc, and default implementations from a protocol
/// cannot be marked as @objc.
func defaultSetup() {
guard !isSetup else {
owsFailDebug("already ready already")
return
}
self.restartOldJobs()
if self.requiresInternet {
NotificationCenter.default.addObserver(forName: .reachabilityChanged,
object: self.reachabilityManager.observationContext,
queue: nil) { _ in
if self.reachabilityManager.isReachable {
Logger.verbose("isReachable: true")
self.becameReachable()
} else {
Logger.verbose("isReachable: false")
}
}
}
self.isSetup = true
self.startWorkWhenAppIsReady()
}
func remainingRetries(durableOperation: DurableOperationType) -> UInt {
let maxRetries = type(of: self).maxRetries
let failureCount = durableOperation.jobRecord.failureCount
guard maxRetries > failureCount else {
return 0
}
return maxRetries - failureCount
}
func becameReachable() {
guard requiresInternet else {
owsFailDebug("should only be called if `requiresInternet` is true")
return
}
_ = self.runAnyQueuedRetry()
}
func runAnyQueuedRetry() -> DurableOperationType? {
guard let runningDurableOperation = self.runningOperations.first else {
return nil
}
runningDurableOperation.operation.runAnyQueuedRetry()
return runningDurableOperation
}
// MARK: DurableOperationDelegate
func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) {
self.runningOperations = self.runningOperations.filter { $0 !== operation }
operation.jobRecord.remove(with: transaction)
}
func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction) {
do {
try operation.jobRecord.addFailure(transaction: transaction)
} catch {
owsFailDebug("error while addingFailure: \(error)")
operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction)
}
}
func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) {
self.runningOperations = self.runningOperations.filter { $0 !== operation }
operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction)
}
}