// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. // // stringlint:disable import Foundation import GRDB public protocol JobRunnerType { // MARK: - Configuration func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) func canStart(queue: JobQueue?) -> Bool func afterBlockingQueue(callback: @escaping () -> ()) // MARK: - State Management func jobInfoFor(jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant?) -> [Int64: JobRunner.JobInfo] func appDidFinishLaunching(using dependencies: Dependencies) func appDidBecomeActive(using dependencies: Dependencies) func startNonBlockingQueues(using dependencies: Dependencies) func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, onComplete: (() -> ())?) // MARK: - Job Scheduling @discardableResult func add(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job? func upsert(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) @discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? } // MARK: - JobRunnerType Convenience public extension JobRunnerType { func allJobInfo() -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: nil, state: .any, variant: nil) } func jobInfoFor(jobs: [Job]) -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: jobs, state: .any, variant: nil) } func jobInfoFor(jobs: [Job], state: JobRunner.JobState) -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: jobs, state: state, variant: nil) } func jobInfoFor(state: JobRunner.JobState) -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: nil, state: state, variant: nil) } func jobInfoFor(state: JobRunner.JobState, variant: Job.Variant) -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: nil, state: state, variant: variant) } func jobInfoFor(variant: Job.Variant) -> [Int64: JobRunner.JobInfo] { return jobInfoFor(jobs: nil, state: .any, variant: variant) } func isCurrentlyRunning(_ job: Job?) -> Bool { guard let job: Job = job else { return false } return !jobInfoFor(jobs: [job], state: .running).isEmpty } func hasJob( of variant: Job.Variant? = nil, inState state: JobRunner.JobState = .any, with jobDetails: T ) -> Bool { guard let detailsData: Data = try? JSONEncoder() .with(outputFormatting: .sortedKeys) // Needed for deterministic comparison .encode(jobDetails) else { return false } return jobInfoFor(jobs: nil, state: state, variant: variant) .values .contains(where: { $0.detailsData == detailsData }) } func stopAndClearPendingJobs(exceptForVariant: Job.Variant? = nil, onComplete: (() -> ())? = nil) { stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } } // MARK: - JobExecutor public protocol JobExecutor { /// The maximum number of times the job can fail before it fails permanently /// /// **Note:** A value of `-1` means it will retry indefinitely static var maxFailureCount: Int { get } static var requiresThreadId: Bool { get } static var requiresInteractionId: Bool { get } /// This method contains the logic needed to complete a job /// /// **Note:** The code in this method should run synchronously and the various /// "result" blocks should not be called within a database closure /// /// - Parameters: /// - job: The job which is being run /// - success: The closure which is called when the job succeeds (with an /// updated `job` and a flag indicating whether the job should forcibly stop running) /// - failure: The closure which is called when the job fails (with an updated /// `job`, an `Error` (if applicable) and a flag indicating whether it was a permanent /// failure) /// - deferred: The closure which is called when the job is deferred (with an /// updated `job`) static func run( _ job: Job, queue: DispatchQueue, success: @escaping (Job, Bool, Dependencies) -> (), failure: @escaping (Job, Error?, Bool, Dependencies) -> (), deferred: @escaping (Job, Dependencies) -> (), using dependencies: Dependencies ) } // MARK: - JobRunner public final class JobRunner: JobRunnerType { public struct JobState: OptionSet, Hashable { public let rawValue: UInt8 public init(rawValue: UInt8) { self.rawValue = rawValue } public static let pending: JobState = JobState(rawValue: 1 << 0) public static let running: JobState = JobState(rawValue: 1 << 1) public static let any: JobState = [ .pending, .running ] } public enum JobResult { case succeeded case failed case deferred case notFound } public struct JobInfo: Equatable, CustomDebugStringConvertible { public let variant: Job.Variant public let threadId: String? public let interactionId: Int64? public let detailsData: Data? public var debugDescription: String { let dataDescription: String = detailsData .map { data in "Data(hex: \(data.toHexString()), \(data.bytes.count) bytes" } .defaulting(to: "nil") return [ "JobRunner.JobInfo(", "variant: \(variant),", " threadId: \(threadId ?? "nil"),", " interactionId: \(interactionId.map { "\($0)" } ?? "nil"),", " detailsData: \(dataDescription)", ")" ].joined() } } // MARK: - Variables private let allowToExecuteJobs: Bool private let blockingQueue: Atomic private let queues: Atomic<[Job.Variant: JobQueue]> private var blockingQueueDrainCallback: Atomic<[() -> ()]> = Atomic([]) internal var appReadyToStartQueues: Atomic = Atomic(false) internal var appHasBecomeActive: Atomic = Atomic(false) internal var perSessionJobsCompleted: Atomic> = Atomic([]) internal var hasCompletedInitialBecomeActive: Atomic = Atomic(false) internal var shutdownBackgroundTask: Atomic = Atomic(nil) private var canStartNonBlockingQueue: Bool { blockingQueue.wrappedValue?.hasStartedAtLeastOnce.wrappedValue == true && blockingQueue.wrappedValue?.isRunning.wrappedValue != true && appHasBecomeActive.wrappedValue } // MARK: - Initialization init( isTestingJobRunner: Bool = false, variantsToExclude: [Job.Variant] = [], using dependencies: Dependencies = Dependencies() ) { var jobVariants: Set = Job.Variant.allCases .filter { !variantsToExclude.contains($0) } .asSet() self.allowToExecuteJobs = ( isTestingJobRunner || ( HasAppContext() && CurrentAppContext().isMainApp && !SNUtilitiesKit.isRunningTests ) ) self.blockingQueue = Atomic( JobQueue( type: .blocking, executionType: .serial, qos: .default, isTestingJobRunner: isTestingJobRunner, jobVariants: [] ) ) self.queues = Atomic([ // MARK: -- Message Send Queue JobQueue( type: .messageSend, executionType: .concurrent, // Allow as many jobs to run at once as supported by the device qos: .default, isTestingJobRunner: isTestingJobRunner, jobVariants: [ jobVariants.remove(.attachmentUpload), jobVariants.remove(.messageSend), jobVariants.remove(.notifyPushServer), jobVariants.remove(.sendReadReceipts), jobVariants.remove(.groupLeaving), jobVariants.remove(.configurationSync) ].compactMap { $0 } ), // MARK: -- Message Receive Queue JobQueue( type: .messageReceive, // Explicitly serial as executing concurrently means message receives getting processed at // different speeds which can result in: // • Small batches of messages appearing in the UI before larger batches // • Closed group messages encrypted with updated keys could start parsing before it's key // update message has been processed (ie. guaranteed to fail) executionType: .serial, qos: .default, isTestingJobRunner: isTestingJobRunner, jobVariants: [ jobVariants.remove(.messageReceive), jobVariants.remove(.configMessageReceive) ].compactMap { $0 } ), // MARK: -- Attachment Download Queue JobQueue( type: .attachmentDownload, executionType: .serial, qos: .utility, isTestingJobRunner: isTestingJobRunner, jobVariants: [ jobVariants.remove(.attachmentDownload) ].compactMap { $0 } ), // MARK: -- General Queue JobQueue( type: .general(number: 0), executionType: .serial, qos: .utility, isTestingJobRunner: isTestingJobRunner, jobVariants: Array(jobVariants) ) ].reduce(into: [:]) { prev, next in next.jobVariants.forEach { variant in prev[variant] = next } }) // Now that we've finished setting up the JobRunner, update the queue closures self.blockingQueue.mutate { $0?.canStart = { [weak self] queue -> Bool in (self?.canStart(queue: queue) == true) } $0?.onQueueDrained = { [weak self] in // Once all blocking jobs have been completed we want to start running // the remaining job queues self?.startNonBlockingQueues(using: dependencies) self?.blockingQueueDrainCallback.mutate { $0.forEach { $0() } $0 = [] } } } self.queues.mutate { $0.values.forEach { queue in queue.canStart = { [weak self] targetQueue -> Bool in (self?.canStart(queue: targetQueue) == true) } } } } // MARK: - Configuration public func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { blockingQueue.wrappedValue?.setExecutor(executor, for: variant) // The blocking queue can run any job queues.wrappedValue[variant]?.setExecutor(executor, for: variant) } public func canStart(queue: JobQueue?) -> Bool { return ( allowToExecuteJobs && appReadyToStartQueues.wrappedValue && ( queue?.type == .blocking || canStartNonBlockingQueue ) ) } public func afterBlockingQueue(callback: @escaping () -> ()) { guard (blockingQueue.wrappedValue?.hasStartedAtLeastOnce.wrappedValue != true) || (blockingQueue.wrappedValue?.isRunning.wrappedValue == true) else { return callback() } blockingQueueDrainCallback.mutate { $0.append(callback) } } // MARK: - State Management public func jobInfoFor( jobs: [Job]?, state: JobRunner.JobState, variant: Job.Variant? ) -> [Int64: JobRunner.JobInfo] { var result: [(Int64, JobRunner.JobInfo)] = [] let targetKeys: [JobQueue.JobKey] = (jobs?.compactMap { JobQueue.JobKey($0) } ?? []) let targetVariants: [Job.Variant] = (variant.map { [$0] } ?? jobs?.map { $0.variant }) .defaulting(to: []) // Insert the state of any pending jobs if state.contains(.pending) { func infoFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, JobRunner.JobInfo)] { return (queue?.pendingJobsQueue.wrappedValue .filter { variants.isEmpty || variants.contains($0.variant) } .compactMap { job -> (Int64, JobRunner.JobInfo)? in guard let jobKey: JobQueue.JobKey = JobQueue.JobKey(job) else { return nil } guard targetKeys.isEmpty || targetKeys.contains(jobKey) else { return nil } return ( jobKey.id, JobRunner.JobInfo( variant: job.variant, threadId: job.threadId, interactionId: job.interactionId, detailsData: job.details ) ) }) .defaulting(to: []) } result.append(contentsOf: infoFor(queue: blockingQueue.wrappedValue, variants: targetVariants)) queues.wrappedValue .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) } .map { _, queue in queue } .asSet() .forEach { queue in result.append(contentsOf: infoFor(queue: queue, variants: targetVariants)) } } // Insert the state of any running jobs if state.contains(.running) { func infoFor(queue: JobQueue?, variants: [Job.Variant]) -> [(Int64, JobRunner.JobInfo)] { return (queue?.infoForAllCurrentlyRunningJobs() .filter { variants.isEmpty || variants.contains($0.value.variant) } .compactMap { jobId, info -> (Int64, JobRunner.JobInfo)? in guard targetKeys.isEmpty || targetKeys.contains(JobQueue.JobKey(id: jobId, variant: info.variant)) else { return nil } return (jobId, info) }) .defaulting(to: []) } result.append(contentsOf: infoFor(queue: blockingQueue.wrappedValue, variants: targetVariants)) queues.wrappedValue .filter { key, _ -> Bool in targetVariants.isEmpty || targetVariants.contains(key) } .map { _, queue in queue } .asSet() .forEach { queue in result.append(contentsOf: infoFor(queue: queue, variants: targetVariants)) } } return result .reduce(into: [:]) { result, next in result[next.0] = next.1 } } public func appDidFinishLaunching(using dependencies: Dependencies) { // Flag that the JobRunner can start it's queues appReadyToStartQueues.mutate { $0 = true } // Note: 'appDidBecomeActive' will run on first launch anyway so we can // leave those jobs out and can wait until then to start the JobRunner let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = dependencies.storage .read { db in let blockingJobs: [Job] = try Job .filter( [ Job.Behaviour.recurringOnLaunch, Job.Behaviour.runOnceNextLaunch ].contains(Job.Columns.behaviour) ) .filter(Job.Columns.shouldBlock == true) .order( Job.Columns.priority.desc, Job.Columns.id ) .fetchAll(db) let nonblockingJobs: [Job] = try Job .filter( [ Job.Behaviour.recurringOnLaunch, Job.Behaviour.runOnceNextLaunch ].contains(Job.Columns.behaviour) ) .filter(Job.Columns.shouldBlock == false) .order( Job.Columns.priority.desc, Job.Columns.id ) .fetchAll(db) return (blockingJobs, nonblockingJobs) } .defaulting(to: ([], [])) // Add and start any blocking jobs blockingQueue.wrappedValue?.appDidFinishLaunching( with: jobsToRun.blocking, canStart: true, using: dependencies ) // Add any non-blocking jobs (we don't start these incase there are blocking "on active" // jobs as well) let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant) let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue jobsByVariant.forEach { variant, jobs in jobQueues[variant]?.appDidFinishLaunching( with: jobs, canStart: false, using: dependencies ) } } public func appDidBecomeActive(using dependencies: Dependencies) { // Flag that the JobRunner can start it's queues and start queueing non-launch jobs appReadyToStartQueues.mutate { $0 = true } appHasBecomeActive.mutate { $0 = true } // If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it // can result in the database being suspended and us being unable to interact with it at all shutdownBackgroundTask.mutate { $0?.cancel() $0 = nil } // Retrieve any jobs which should run when becoming active let hasCompletedInitialBecomeActive: Bool = self.hasCompletedInitialBecomeActive.wrappedValue let jobsToRun: [Job] = dependencies.storage .read { db in return try Job .filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive) .order( Job.Columns.priority.desc, Job.Columns.id ) .fetchAll(db) } .defaulting(to: []) .filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive } // Store the current queue state locally to avoid multiple atomic retrievals let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true) guard !jobsToRun.isEmpty else { if !blockingQueueIsRunning { jobQueues.map { _, queue in queue }.asSet().forEach { $0.start(using: dependencies) } } return } // Add and start any non-blocking jobs (if there are no blocking jobs) // // We only want to trigger the queue to start once so we need to consolidate the // queues to list of jobs (as queues can handle multiple job variants), this means // that 'onActive' jobs will be queued before any standard jobs let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.grouped(by: \.variant) jobQueues .reduce(into: [:]) { result, variantAndQueue in result[variantAndQueue.value] = (result[variantAndQueue.value] ?? []) .appending(contentsOf: (jobsByVariant[variantAndQueue.key] ?? [])) } .forEach { queue, jobs in queue.appDidBecomeActive( with: jobs, canStart: !blockingQueueIsRunning, using: dependencies ) } self.hasCompletedInitialBecomeActive.mutate { $0 = true } } public func startNonBlockingQueues(using dependencies: Dependencies) { queues.wrappedValue.map { _, queue in queue }.asSet().forEach { queue in queue.start(using: dependencies) } } public func stopAndClearPendingJobs( exceptForVariant: Job.Variant?, onComplete: (() -> ())? ) { // Inform the JobRunner that it can't start any queues (this is to prevent queues from // rescheduling themselves while in the background, when the app restarts or becomes active // the JobRunenr will update this flag) appReadyToStartQueues.mutate { $0 = false } appHasBecomeActive.mutate { $0 = false } // Stop all queues except for the one containing the `exceptForVariant` queues.wrappedValue .map { _, queue in queue } .asSet() .filter { queue -> Bool in guard let exceptForVariant: Job.Variant = exceptForVariant else { return true } return !queue.jobVariants.contains(exceptForVariant) } .forEach { $0.stopAndClearPendingJobs() } // Ensure the queue is actually running (if not the trigger the callback immediately) guard let exceptForVariant: Job.Variant = exceptForVariant, let queue: JobQueue = queues.wrappedValue[exceptForVariant], queue.isRunning.wrappedValue == true else { onComplete?() return } let oldQueueDrained: (() -> ())? = queue.onQueueDrained // Create a backgroundTask to give the queue the chance to properly be drained shutdownBackgroundTask.mutate { $0 = OWSBackgroundTask(labelStr: #function) { [weak queue] state in // If the background task didn't succeed then trigger the onComplete (and hope we have // enough time to complete it's logic) guard state != .cancelled else { queue?.onQueueDrained = oldQueueDrained return } guard state != .success else { return } onComplete?() queue?.onQueueDrained = oldQueueDrained queue?.stopAndClearPendingJobs() } } // Add a callback to be triggered once the queue is drained queue.onQueueDrained = { [weak self, weak queue] in oldQueueDrained?() queue?.onQueueDrained = oldQueueDrained onComplete?() self?.shutdownBackgroundTask.mutate { $0 = nil } } } // MARK: - Execution @discardableResult public func add( _ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies ) -> Job? { // Store the job into the database (getting an id for it) guard let updatedJob: Job = try? job?.inserted(db) else { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") return nil } guard !canStartJob || updatedJob.id != nil else { SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") return nil } // Don't add to the queue if the JobRunner isn't ready (it's been saved to the db so it'll be loaded // once the queue actually get started later) guard canAddToQueue(updatedJob) else { return updatedJob } queues.wrappedValue[updatedJob.variant]?.add(db, job: updatedJob, canStartJob: canStartJob, using: dependencies) // Don't start the queue if the job can't be started guard canStartJob else { return updatedJob } // Start the job runner if needed db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(updatedJob.variant)") { [weak self] _ in self?.queues.wrappedValue[updatedJob.variant]?.start(using: dependencies) } return updatedJob } public func upsert( _ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies ) { guard let job: Job = job else { return } // Ignore null jobs guard job.id != nil else { add(db, job: job, canStartJob: canStartJob, using: dependencies) return } // Don't add to the queue if the JobRunner isn't ready (it's been saved to the db so it'll be loaded // once the queue actually get started later) guard canAddToQueue(job) else { return } queues.wrappedValue[job.variant]?.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) // Don't start the queue if the job can't be started guard canStartJob else { return } // Start the job runner if needed db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(job.variant)") { [weak self] _ in self?.queues.wrappedValue[job.variant]?.start(using: dependencies) } } @discardableResult public func insert( _ db: Database, job: Job?, before otherJob: Job ) -> (Int64, Job)? { switch job?.behaviour { case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")") return nil default: break } // Store the job into the database (getting an id for it) guard let updatedJob: Job = try? job?.inserted(db) else { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") return nil } guard let jobId: Int64 = updatedJob.id else { SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") return nil } queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) return (jobId, updatedJob) } internal func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else { callback(.notFound) return } queue.afterCurrentlyRunningJob(jobId, callback: callback) } internal func removePendingJob(_ job: Job?) { guard let job: Job = job, let jobId: Int64 = job.id else { return } queues.wrappedValue[job.variant]?.removePendingJob(jobId) } // MARK: - Convenience fileprivate static func getRetryInterval(for job: Job) -> TimeInterval { // Arbitrary backoff factor... // try 1 delay: 0.5s // try 2 delay: 1s // ... // try 5 delay: 16s // ... // try 11 delay: 512s let maxBackoff: Double = 10 * 60 // 10 minutes return 0.25 * min(maxBackoff, pow(2, Double(job.failureCount))) } fileprivate func canAddToQueue(_ job: Job) -> Bool { // We can only start the job if it's an "on launch" job or the app has become active return ( job.behaviour == .runOnceNextLaunch || job.behaviour == .recurringOnLaunch || appHasBecomeActive.wrappedValue ) } } // MARK: - JobQueue public final class JobQueue: Hashable { fileprivate enum QueueType: Hashable { case blocking case general(number: Int) case messageSend case messageReceive case attachmentDownload var name: String { switch self { case .blocking: return "Blocking" case .general(let number): return "General-\(number)" case .messageSend: return "MessageSend" case .messageReceive: return "MessageReceive" case .attachmentDownload: return "AttachmentDownload" } } } fileprivate enum ExecutionType { /// A serial queue will execute one job at a time until the queue is empty, then will load any new/deferred /// jobs and run those one at a time case serial /// A concurrent queue will execute as many jobs as the device supports at once until the queue is empty, /// then will load any new/deferred jobs and try to start them all case concurrent } private class Trigger { private var timer: Timer? fileprivate var fireTimestamp: TimeInterval = 0 static func create( queue: JobQueue, timestamp: TimeInterval, using dependencies: Dependencies ) -> Trigger? { /// Setup the trigger (wait at least 1 second before triggering) /// /// **Note:** We use the `Timer.scheduledTimerOnMainThread` method because running a timer /// on our random queue threads results in the timer never firing, the `start` method will redirect itself to /// the correct thread let trigger: Trigger = Trigger() trigger.fireTimestamp = max(1, (timestamp - dependencies.dateNow.timeIntervalSince1970)) trigger.timer = Timer.scheduledTimerOnMainThread( withTimeInterval: trigger.fireTimestamp, repeats: false, using: dependencies, block: { [weak queue] _ in queue?.start(using: dependencies) } ) return trigger } func invalidate() { // Need to do this to prevent a strong reference cycle timer?.invalidate() timer = nil } } fileprivate struct JobKey: Equatable, Hashable { fileprivate let id: Int64 fileprivate let variant: Job.Variant fileprivate init(id: Int64, variant: Job.Variant) { self.id = id self.variant = variant } fileprivate init?(_ job: Job?) { guard let id: Int64 = job?.id, let variant: Job.Variant = job?.variant else { return nil } self.id = id self.variant = variant } } private static let deferralLoopThreshold: Int = 3 private let id: UUID = UUID() fileprivate let type: QueueType private let executionType: ExecutionType private let qosClass: DispatchQoS private let queueKey: DispatchSpecificKey = DispatchSpecificKey() private let queueContext: String fileprivate let jobVariants: [Job.Variant] private lazy var internalQueue: DispatchQueue = { let result: DispatchQueue = DispatchQueue( label: self.queueContext, qos: self.qosClass, attributes: (self.executionType == .concurrent ? [.concurrent] : []), autoreleaseFrequency: .inherit, target: nil ) result.setSpecific(key: queueKey, value: queueContext) return result }() private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) fileprivate var canStart: ((JobQueue?) -> Bool)? fileprivate var onQueueDrained: (() -> ())? fileprivate var hasStartedAtLeastOnce: Atomic = Atomic(false) fileprivate var isRunning: Atomic = Atomic(false) fileprivate var pendingJobsQueue: Atomic<[Job]> = Atomic([]) private var nextTrigger: Atomic = Atomic(nil) private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) private var currentlyRunningJobIds: Atomic> = Atomic([]) private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) private let maxDeferralsPerSecond: Int fileprivate var hasPendingJobs: Bool { !pendingJobsQueue.wrappedValue.isEmpty } // MARK: - Initialization fileprivate init( type: QueueType, executionType: ExecutionType, qos: DispatchQoS, isTestingJobRunner: Bool, jobVariants: [Job.Variant] ) { self.type = type self.executionType = executionType self.queueContext = "JobQueue-\(type.name)" self.qosClass = qos self.maxDeferralsPerSecond = (isTestingJobRunner ? 10 : 1) // Allow for tripping the defer loop in tests self.jobVariants = jobVariants } // MARK: - Hashable public func hash(into hasher: inout Hasher) { id.hash(into: &hasher) } public static func == (lhs: JobQueue, rhs: JobQueue) -> Bool { return (lhs.id == rhs.id) } // MARK: - Configuration fileprivate func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { executorMap.mutate { $0[variant] = executor } } // MARK: - Execution fileprivate func add( _ db: Database, job: Job, canStartJob: Bool, using dependencies: Dependencies ) { // Check if the job should be added to the queue guard canStartJob, job.behaviour != .runOnceNextLaunch, job.nextRunTimestamp <= dependencies.dateNow.timeIntervalSince1970 else { return } guard job.id != nil else { SNLog("[JobRunner] Prevented attempt to add \(job.variant) job without id to queue") return } pendingJobsQueue.mutate { $0.append(job) } // If this is a concurrent queue then we should immediately start the next job guard executionType == .concurrent else { return } // Ensure that the database commit has completed and then trigger the next job to run (need // to ensure any interactions have been correctly inserted first) db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in self?.runNextJob(using: dependencies) } } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start /// the JobRunner /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started fileprivate func upsert( _ db: Database, job: Job, canStartJob: Bool, using dependencies: Dependencies ) { guard let jobId: Int64 = job.id else { SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") return } // Lock the pendingJobsQueue while checking the index and inserting to ensure we don't run into // any multi-threading shenanigans // // Note: currently running jobs are removed from the pendingJobsQueue so we don't need to check // the 'jobsCurrentlyRunning' set var didUpdateExistingJob: Bool = false pendingJobsQueue.mutate { queue in if let jobIndex: Array.Index = queue.firstIndex(where: { $0.id == jobId }) { queue[jobIndex] = job didUpdateExistingJob = true } } // If we didn't update an existing job then we need to add it to the pendingJobsQueue guard !didUpdateExistingJob else { return } add(db, job: job, canStartJob: canStartJob, using: dependencies) } fileprivate func insert(_ job: Job, before otherJob: Job) { guard job.id != nil else { SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue") return } // Insert the job before the current job (re-adding the current job to // the start of the pendingJobsQueue if it's not in there) - this will mean the new // job will run and then the otherJob will run (or run again) once it's // done pendingJobsQueue.mutate { guard let otherJobIndex: Int = $0.firstIndex(of: otherJob) else { $0.insert(contentsOf: [job, otherJob], at: 0) return } $0.insert(job, at: otherJobIndex) } } fileprivate func appDidFinishLaunching( with jobs: [Job], canStart: Bool, using dependencies: Dependencies ) { pendingJobsQueue.mutate { $0.append(contentsOf: jobs) } // Start the job runner if needed if canStart && !isRunning.wrappedValue { start(using: dependencies) } } fileprivate func appDidBecomeActive( with jobs: [Job], canStart: Bool, using dependencies: Dependencies ) { let currentlyRunningJobIds: Set = currentlyRunningJobIds.wrappedValue pendingJobsQueue.mutate { queue in // Avoid re-adding jobs to the queue that are already in it (this can // happen if the user sends the app to the background before the 'onActive' // jobs and then brings it back to the foreground) let jobsNotAlreadyInQueue: [Job] = jobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) && !queue.contains(where: { $0.id == job.id }) } queue.append(contentsOf: jobsNotAlreadyInQueue) } // Start the job runner if needed if canStart && !isRunning.wrappedValue { start(using: dependencies) } } fileprivate func infoForAllCurrentlyRunningJobs() -> [Int64: JobRunner.JobInfo] { return currentlyRunningJobInfo.wrappedValue } fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { guard currentlyRunningJobIds.wrappedValue.contains(jobId) else { return callback(.notFound) } jobCallbacks.mutate { jobCallbacks in jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback) } } fileprivate func hasPendingOrRunningJobWith( threadId: String? = nil, interactionId: Int64? = nil, detailsData: Data? = nil ) -> Bool { let pendingJobs: [Job] = pendingJobsQueue.wrappedValue let currentlyRunningJobInfo: [Int64: JobRunner.JobInfo] = currentlyRunningJobInfo.wrappedValue var possibleJobIds: Set = Set(currentlyRunningJobInfo.keys) .inserting(contentsOf: pendingJobs.compactMap { $0.id }.asSet()) // Remove any which don't have the matching threadId (if provided) if let targetThreadId: String = threadId { let pendingJobIdsWithWrongThreadId: Set = pendingJobs .filter { $0.threadId != targetThreadId } .compactMap { $0.id } .asSet() let runningJobIdsWithWrongThreadId: Set = currentlyRunningJobInfo .filter { _, info -> Bool in info.threadId != targetThreadId } .map { key, _ in key } .asSet() possibleJobIds = possibleJobIds .subtracting(pendingJobIdsWithWrongThreadId) .subtracting(runningJobIdsWithWrongThreadId) } // Remove any which don't have the matching interactionId (if provided) if let targetInteractionId: Int64 = interactionId { let pendingJobIdsWithWrongInteractionId: Set = pendingJobs .filter { $0.interactionId != targetInteractionId } .compactMap { $0.id } .asSet() let runningJobIdsWithWrongInteractionId: Set = currentlyRunningJobInfo .filter { _, info -> Bool in info.interactionId != targetInteractionId } .map { key, _ in key } .asSet() possibleJobIds = possibleJobIds .subtracting(pendingJobIdsWithWrongInteractionId) .subtracting(runningJobIdsWithWrongInteractionId) } // Remove any which don't have the matching details (if provided) if let targetDetailsData: Data = detailsData { let pendingJobIdsWithWrongDetailsData: Set = pendingJobs .filter { $0.details != targetDetailsData } .compactMap { $0.id } .asSet() let runningJobIdsWithWrongDetailsData: Set = currentlyRunningJobInfo .filter { _, info -> Bool in info.detailsData != detailsData } .map { key, _ in key } .asSet() possibleJobIds = possibleJobIds .subtracting(pendingJobIdsWithWrongDetailsData) .subtracting(runningJobIdsWithWrongDetailsData) } return !possibleJobIds.isEmpty } fileprivate func removePendingJob(_ jobId: Int64) { pendingJobsQueue.mutate { queue in queue = queue.filter { $0.id != jobId } } } // MARK: - Job Running fileprivate func start( forceWhenAlreadyRunning: Bool = false, using dependencies: Dependencies ) { // Only start if the JobRunner is allowed to start the queue guard canStart?(self) == true else { return } guard forceWhenAlreadyRunning || !isRunning.wrappedValue else { return } // The JobRunner runs synchronously we need to ensure this doesn't start // on the main thread (if it is on the main thread then swap to a different thread) guard DispatchQueue.with(key: queueKey, matches: queueContext, using: dependencies) else { internalQueue.async(using: dependencies) { [weak self] in self?.start(using: dependencies) } return } // Flag the JobRunner as running (to prevent something else from trying to start it // and messing with the execution behaviour) var wasAlreadyRunning: Bool = false isRunning.mutate { isRunning in wasAlreadyRunning = isRunning isRunning = true } hasStartedAtLeastOnce.mutate { $0 = true } // Get any pending jobs let jobVariants: [Job.Variant] = self.jobVariants let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let jobsAlreadyInQueue: Set = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = dependencies.storage.read(using: dependencies) { db in try Job .filterPendingJobs( variants: jobVariants, excludeFutureJobs: true, includeJobsWithDependencies: false ) .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue .fetchAll(db) } .defaulting(to: []) // Determine the number of jobs to run var jobCount: Int = 0 pendingJobsQueue.mutate { queue in queue.append(contentsOf: jobsToRun) jobCount = queue.count } // If there are no pending jobs and nothing in the queue then schedule the JobRunner // to start again when the next scheduled job should start guard jobCount > 0 else { if jobIdsAlreadyRunning.isEmpty { isRunning.mutate { $0 = false } scheduleNextSoonestJob(using: dependencies) } return } // Run the first job in the pendingJobsQueue if !wasAlreadyRunning { SNLogNotTests("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))") } runNextJob(using: dependencies) } fileprivate func stopAndClearPendingJobs() { isRunning.mutate { $0 = false } pendingJobsQueue.mutate { $0 = [] } deferLoopTracker.mutate { $0 = [:] } } private func runNextJob(using dependencies: Dependencies) { // Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job) guard isRunning.wrappedValue else { return } // Ensure this is running on the correct queue guard DispatchQueue.with(key: queueKey, matches: queueContext, using: dependencies) else { internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } return } guard let (nextJob, numJobsRemaining): (Job, Int) = pendingJobsQueue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { isRunning.mutate { $0 = false } } // Always attempt to schedule the next soonest job (otherwise if enough jobs get started in rapid // succession then pending/failed jobs in the database may never get re-started in a concurrent queue) scheduleNextSoonestJob(using: dependencies) return } guard let jobExecutor: JobExecutor.Type = executorMap.wrappedValue[nextJob.variant] else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing executor") handleJobFailed( nextJob, error: JobRunnerError.executorMissing, permanentFailure: true, using: dependencies ) return } guard !jobExecutor.requiresThreadId || nextJob.threadId != nil else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required threadId") handleJobFailed( nextJob, error: JobRunnerError.requiredThreadIdMissing, permanentFailure: true, using: dependencies ) return } guard !jobExecutor.requiresInteractionId || nextJob.interactionId != nil else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required interactionId") handleJobFailed( nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true, using: dependencies ) return } guard nextJob.id != nil else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id") handleJobFailed( nextJob, error: JobRunnerError.jobIdMissing, permanentFailure: false, using: dependencies ) return } // If the 'nextRunTimestamp' for the job is in the future then don't run it yet guard nextJob.nextRunTimestamp <= dependencies.dateNow.timeIntervalSince1970 else { handleJobDeferred(nextJob, using: dependencies) return } // Check if the next job has any dependencies let dependencyInfo: (expectedCount: Int, jobs: Set) = dependencies.storage.read(using: dependencies) { db in let expectedDependencies: Set = try JobDependencies .filter(JobDependencies.Columns.jobId == nextJob.id) .fetchSet(db) let jobDependencies: Set = try Job .filter(ids: expectedDependencies.compactMap { $0.dependantId }) .fetchSet(db) return (expectedDependencies.count, jobDependencies) } .defaulting(to: (0, [])) guard dependencyInfo.jobs.count == dependencyInfo.expectedCount else { SNLog("[JobRunner] \(queueContext) found job with missing dependencies, removing the job") handleJobFailed( nextJob, error: JobRunnerError.missingDependencies, permanentFailure: true, using: dependencies ) return } guard dependencyInfo.jobs.isEmpty else { SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") /// Remove all jobs this one is dependant on that aren't currently running from the queue and re-insert them at the start /// of the queue /// /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// are successfully completed let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue) let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } pendingJobsQueue.mutate { queue in queue = queue .filter { !dependencyJobsNotCurrentlyRunning.contains($0) } .inserting(contentsOf: dependencyJobsNotCurrentlyRunning, at: 0) } handleJobDeferred(nextJob, using: dependencies) return } // Update the state to indicate the particular job is running // // Note: We need to store 'numJobsRemaining' in it's own variable because // the 'SNLog' seems to dispatch to it's own queue which ends up getting // blocked by the JobRunner's queue becuase 'jobQueue' is Atomic var numJobsRunning: Int = 0 nextTrigger.mutate { trigger in trigger?.invalidate() // Need to invalidate to prevent a memory leak trigger = nil } currentlyRunningJobIds.mutate { currentlyRunningJobIds in currentlyRunningJobIds = currentlyRunningJobIds.inserting(nextJob.id) numJobsRunning = currentlyRunningJobIds.count } currentlyRunningJobInfo.mutate { currentlyRunningJobInfo in currentlyRunningJobInfo = currentlyRunningJobInfo.setting( nextJob.id, JobRunner.JobInfo( variant: nextJob.variant, threadId: nextJob.threadId, interactionId: nextJob.interactionId, detailsData: nextJob.details ) ) } SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to /// the queue which means an odd situation can occasionally occur where the `finished` event can actually run before the `output` /// event - this can result in unexpected behaviours (for more information see https://github.com/groue/GRDB.swift/issues/1334) /// /// Due to this if a job is meant to run on a concurrent queue then we actually want to create a temporary serial queue just for the execution /// of that job let targetQueue: DispatchQueue = { guard executionType == .concurrent else { return internalQueue } return DispatchQueue( label: "\(self.queueContext)-serial", qos: self.qosClass, attributes: [], autoreleaseFrequency: .inherit, target: nil ) }() jobExecutor.run( nextJob, queue: targetQueue, success: handleJobSucceeded, failure: handleJobFailed, deferred: handleJobDeferred, using: dependencies ) // If this queue executes concurrently and there are still jobs remaining then immediately attempt // to start the next job if executionType == .concurrent && numJobsRemaining > 0 { internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } } } private func scheduleNextSoonestJob(using dependencies: Dependencies) { let jobVariants: [Job.Variant] = self.jobVariants let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let nextJobTimestamp: TimeInterval? = dependencies.storage.read(using: dependencies) { db in try Job .filterPendingJobs( variants: jobVariants, excludeFutureJobs: false, includeJobsWithDependencies: false ) .select(.nextRunTimestamp) .filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running .asRequest(of: TimeInterval.self) .fetchOne(db) } // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // the 'onQueueDrained' callback and stop guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, canStart?(self) == true else { if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { self.onQueueDrained?() } return } // If the next job isn't scheduled in the future then just restart the JobRunner immediately let secondsUntilNextJob: TimeInterval = (nextJobTimestamp - dependencies.dateNow.timeIntervalSince1970) guard secondsUntilNextJob > 0 else { // Only log that the queue is getting restarted if this queue had actually been about to stop if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { let timingString: String = (nextJobTimestamp == 0 ? "that should be in the queue" : "scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s") ago" ) SNLog("[JobRunner] Restarting \(queueContext) immediately for job \(timingString)") } // Trigger the 'start' function to load in any pending jobs that aren't already in the // queue (for concurrent queues we want to force them to load in pending jobs and add // them to the queue regardless of whether the queue is already running) internalQueue.async(using: dependencies) { [weak self] in self?.start(forceWhenAlreadyRunning: (self?.executionType == .concurrent), using: dependencies) } return } // Only schedule a trigger if this queue has actually completed guard executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else { return } // Setup a trigger SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")") nextTrigger.mutate { trigger in trigger?.invalidate() // Need to invalidate the old trigger to prevent a memory leak trigger = Trigger.create(queue: self, timestamp: nextJobTimestamp, using: dependencies) } } // MARK: - Handling Results /// This function is called when a job succeeds private func handleJobSucceeded( _ job: Job, shouldStop: Bool, using dependencies: Dependencies ) { /// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is /// removed so we need to retrieve these records before that happens) let dependantJobs: [Job] = dependencies.storage .read(using: dependencies) { db in try job.dependantJobs.fetchAll(db) } .defaulting(to: []) switch job.behaviour { case .runOnce, .runOnceNextLaunch: dependencies.storage.write(using: dependencies) { db in /// Since this job has been completed we can update the dependencies so other job that were dependant /// on this one can be run _ = try JobDependencies .filter(JobDependencies.Columns.dependantId == job.id) .deleteAll(db) _ = try job.delete(db) } case .recurring where shouldStop == true: dependencies.storage.write(using: dependencies) { db in /// Since this job has been completed we can update the dependencies so other job that were dependant /// on this one can be run _ = try JobDependencies .filter(JobDependencies.Columns.dependantId == job.id) .deleteAll(db) _ = try job.delete(db) } /// For `recurring` jobs which have already run, they should automatically run again but we want at least 1 second /// to pass before doing so - the job itself should really update it's own `nextRunTimestamp` (this is just a safety net) case .recurring where job.nextRunTimestamp <= dependencies.dateNow.timeIntervalSince1970: guard let jobId: Int64 = job.id else { break } dependencies.storage.write(using: dependencies) { db in _ = try Job .filter(id: jobId) .updateAll( db, Job.Columns.failureCount.set(to: 0), Job.Columns.nextRunTimestamp.set(to: (dependencies.dateNow.timeIntervalSince1970 + 1)) ) } /// For `recurringOnLaunch/Active` jobs which have already run but failed once, we need to clear their /// `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over and over again case .recurringOnLaunch, .recurringOnActive: guard let jobId: Int64 = job.id, job.failureCount != 0 && job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude else { break } dependencies.storage.write(using: dependencies) { db in _ = try Job .filter(id: jobId) .updateAll( db, Job.Columns.failureCount.set(to: 0), Job.Columns.nextRunTimestamp.set(to: 0) ) } default: break } /// Now that the job has been completed we want to insert any jobs that were dependant on it, that aren't already running /// to the start of the queue (the most likely case is that we want an entire job chain to be completed at the same time rather /// than being blocked by other unrelated jobs) /// /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// removed from the queue, replaced by their dependencies if !dependantJobs.isEmpty { let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue) let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } pendingJobsQueue.mutate { queue in queue = queue .filter { !dependantJobsNotCurrentlyRunning.contains($0) } .inserting(contentsOf: dependantJobsNotCurrentlyRunning, at: 0) } } // Perform job cleanup and start the next job performCleanUp(for: job, result: .succeeded, using: dependencies) internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } } /// This function is called when a job fails, if it's wasn't a permanent failure then the 'failureCount' for the job will be incremented and it'll /// be re-run after a retry interval has passed private func handleJobFailed( _ job: Job, error: Error?, permanentFailure: Bool, using dependencies: Dependencies ) { guard dependencies.storage.read(using: dependencies, { db in try Job.exists(db, id: job.id ?? -1) }) == true else { SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled") performCleanUp(for: job, result: .failed, using: dependencies) internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } return } // If this is the blocking queue and a "blocking" job failed then rerun it // immediately (in this case we don't trigger any job callbacks because the // job isn't actually done, it's going to try again immediately) if self.type == .blocking && job.shouldBlock { SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately") // If it was a possible deferral loop then we don't actually want to // retry the job (even if it's a blocking one, this gives a small chance // that the app could continue to function) let wasPossibleDeferralLoop: Bool = { if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true } return false }() performCleanUp( for: job, result: .failed, shouldTriggerCallbacks: wasPossibleDeferralLoop, using: dependencies ) // Only add it back to the queue if it wasn't a deferral loop if !wasPossibleDeferralLoop { pendingJobsQueue.mutate { $0.insert(job, at: 0) } } internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } return } // Get the max failure count for the job (a value of '-1' means it will retry indefinitely) let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0) let nextRunTimestamp: TimeInterval = (dependencies.dateNow.timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) var dependantJobIds: [Int64] = [] var failureText: String = "failed" dependencies.storage.write(using: dependencies) { db in /// Retrieve a list of dependant jobs so we can clear them from the queue dependantJobIds = try job.dependantJobs .select(.id) .asRequest(of: Int64.self) .fetchAll(db) /// Delete/update the failed jobs and any dependencies let updatedFailureCount: UInt = (job.failureCount + 1) guard !permanentFailure && ( maxFailureCount < 0 || updatedFailureCount <= maxFailureCount ) else { failureText = (maxFailureCount >= 0 && updatedFailureCount > maxFailureCount ? "failed permanently; too many retries" : "failed permanently" ) // If the job permanently failed or we have performed all of our retry attempts // then delete the job and all of it's dependant jobs (it'll probably never succeed) _ = try job.dependantJobs .deleteAll(db) _ = try job.delete(db) return } failureText = "failed; scheduling retry (failure count is \(updatedFailureCount))" _ = try job .with( failureCount: updatedFailureCount, nextRunTimestamp: nextRunTimestamp ) .saved(db) // Update the failureCount and nextRunTimestamp on dependant jobs as well (update the // 'nextRunTimestamp' value to be 1ms later so when the queue gets regenerated they'll // come after the dependency) try job.dependantJobs .updateAll( db, Job.Columns.failureCount.set(to: updatedFailureCount), Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000))) ) } /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try /// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely) if !dependantJobIds.isEmpty { pendingJobsQueue.mutate { queue in queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } } } SNLog("[JobRunner] \(queueContext) \(job.variant) job \(failureText)") performCleanUp(for: job, result: .failed, using: dependencies) internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } } /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant /// on other jobs, and it should automatically manage those dependencies) public func handleJobDeferred( _ job: Job, using dependencies: Dependencies ) { var stuckInDeferLoop: Bool = false deferLoopTracker.mutate { guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else { $0 = $0.setting( job.id, (1, [dependencies.dateNow.timeIntervalSince1970]) ) return } let timeNow: TimeInterval = dependencies.dateNow.timeIntervalSince1970 stuckInDeferLoop = ( lastRecord.count >= JobQueue.deferralLoopThreshold && (timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count * maxDeferralsPerSecond) ) $0 = $0.setting( job.id, ( lastRecord.count + 1, // Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster // than one loop per second lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow] ) ) } // It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately // defers itself but then attempts to run again (resulting in an infinite loop); this won't block // the app since it's on a background thread but can result in 100% of a CPU being used (and a // battery drain) // // This code will maintain an in-memory store for any jobs which are deferred too quickly (ie. // more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds) guard !stuckInDeferLoop else { deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) } handleJobFailed( job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false, using: dependencies ) return } performCleanUp(for: job, result: .deferred, using: dependencies) internalQueue.async(using: dependencies) { [weak self] in self?.runNextJob(using: dependencies) } } private func performCleanUp( for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true, using dependencies: Dependencies ) { // The job is removed from the queue before it runs so all we need to to is remove it // from the 'currentlyRunning' set currentlyRunningJobIds.mutate { $0 = $0.removing(job.id) } currentlyRunningJobInfo.mutate { $0 = $0.removingValue(forKey: job.id) } guard shouldTriggerCallbacks else { return } // Run any job callbacks now that it's done var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = [] jobCallbacks.mutate { jobCallbacks in jobCallbacksToRun = (jobCallbacks[job.id] ?? []) jobCallbacks = jobCallbacks.removingValue(forKey: job.id) } DispatchQueue.global(qos: .default).async(using: dependencies) { jobCallbacksToRun.forEach { $0(result) } } } } // MARK: - JobRunner Singleton // FIXME: Remove this once the jobRunner is dependency injected everywhere correctly public extension JobRunner { internal static let instance: JobRunner = JobRunner() // MARK: - Static Access static func setExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { instance.setExecutor(executor, for: variant) } static func appDidFinishLaunching(using dependencies: Dependencies = Dependencies()) { instance.appDidFinishLaunching(using: dependencies) } static func appDidBecomeActive(using dependencies: Dependencies = Dependencies()) { instance.appDidBecomeActive(using: dependencies) } static func afterBlockingQueue(callback: @escaping () -> ()) { instance.afterBlockingQueue(callback: callback) } /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start /// the JobRunner /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started static func add( _ db: Database, job: Job?, canStartJob: Bool = true, using dependencies: Dependencies = Dependencies() ) { instance.add(db, job: job, canStartJob: canStartJob, using: dependencies) } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start /// the JobRunner /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started static func upsert( _ db: Database, job: Job?, canStartJob: Bool = true, using dependencies: Dependencies = Dependencies() ) { instance.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) } @discardableResult static func insert( _ db: Database, job: Job?, before otherJob: Job ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob) } /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their /// failure - they _should_ be picked up again the next time the app is launched) static func stopAndClearPendingJobs( exceptForVariant: Job.Variant? = nil, onComplete: (() -> ())? = nil ) { instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } static func isCurrentlyRunning(_ job: Job?) -> Bool { return instance.isCurrentlyRunning(job) } static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { instance.afterCurrentlyRunningJob(job, callback: callback) } static func removePendingJob(_ job: Job?) { instance.removePendingJob(job) } }