diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index 512fbb126..e8773d8c6 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -825,6 +825,7 @@ FDD2506E283711D600198BDA /* DifferenceKit+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDD2506D283711D600198BDA /* DifferenceKit+Utilities.swift */; }; FDD250702837199200198BDA /* GarbageCollectionJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDD2506F2837199200198BDA /* GarbageCollectionJob.swift */; }; FDD250722837234B00198BDA /* MediaGalleryNavigationController.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDD250712837234B00198BDA /* MediaGalleryNavigationController.swift */; }; + FDDF074A29DAB36900E5E8B5 /* JobRunnerSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDDF074929DAB36900E5E8B5 /* JobRunnerSpec.swift */; }; FDE77F6B280FEB28002CFC5D /* ControlMessageProcessRecord.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDE77F6A280FEB28002CFC5D /* ControlMessageProcessRecord.swift */; }; FDED2E3C282E1B5D00B2CD2A /* UICollectionView+ReusableView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDED2E3B282E1B5D00B2CD2A /* UICollectionView+ReusableView.swift */; }; FDF0B73C27FFD3D6004C14C5 /* LinkPreview.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDF0B73B27FFD3D6004C14C5 /* LinkPreview.swift */; }; @@ -1902,6 +1903,7 @@ FDD2506D283711D600198BDA /* DifferenceKit+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "DifferenceKit+Utilities.swift"; sourceTree = ""; }; FDD2506F2837199200198BDA /* GarbageCollectionJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GarbageCollectionJob.swift; sourceTree = ""; }; FDD250712837234B00198BDA /* MediaGalleryNavigationController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MediaGalleryNavigationController.swift; sourceTree = ""; }; + FDDF074929DAB36900E5E8B5 /* JobRunnerSpec.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = JobRunnerSpec.swift; sourceTree = ""; }; FDE7214F287E50D50093DF33 /* ProtoWrappers.py */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.script.python; path = ProtoWrappers.py; sourceTree = ""; }; FDE72150287E50D50093DF33 /* LintLocalizableStrings.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = LintLocalizableStrings.swift; sourceTree = ""; }; FDE77F68280F9EDA002CFC5D /* JobRunnerError.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = JobRunnerError.swift; sourceTree = ""; }; @@ -3908,6 +3910,7 @@ children = ( FD37EA1228AB3F60003AE748 /* Database */, FD83B9B927CF20A5005E1583 /* General */, + FDDF074829DAB35200E5E8B5 /* JobRunner */, ); path = SessionUtilitiesKitTests; sourceTree = ""; @@ -4112,6 +4115,14 @@ path = Models; sourceTree = ""; }; + FDDF074829DAB35200E5E8B5 /* JobRunner */ = { + isa = PBXGroup; + children = ( + FDDF074929DAB36900E5E8B5 /* JobRunnerSpec.swift */, + ); + path = JobRunner; + sourceTree = ""; + }; FDE7214E287E50D50093DF33 /* Scripts */ = { isa = PBXGroup; children = ( @@ -5804,6 +5815,7 @@ FD2AAAEE28ED3E1100A49611 /* MockGeneralCache.swift in Sources */, FD37EA1528AB42CB003AE748 /* IdentitySpec.swift in Sources */, FD1A94FE2900D2EA000D73D3 /* PersistableRecordUtilitiesSpec.swift in Sources */, + FDDF074A29DAB36900E5E8B5 /* JobRunnerSpec.swift in Sources */, FDC290AA27D9B6FD005DAE71 /* Mock.swift in Sources */, ); runOnlyForDeploymentPostprocessing = 0; diff --git a/Session/Notifications/SyncPushTokensJob.swift b/Session/Notifications/SyncPushTokensJob.swift index 61fb77c80..e6206b300 100644 --- a/Session/Notifications/SyncPushTokensJob.swift +++ b/Session/Notifications/SyncPushTokensJob.swift @@ -16,16 +16,17 @@ public enum SyncPushTokensJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // Don't run when inactive or not in main app or if the user doesn't exist yet guard (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false), Identity.userExists() else { - deferred(job) // Don't need to do anything if it's not the main app + deferred(job, dependencies) // Don't need to do anything if it's not the main app return } @@ -33,7 +34,7 @@ public enum SyncPushTokensJob: JobExecutor { // the main thread then swap to it guard Thread.isMainThread else { DispatchQueue.main.async { - run(job, queue: queue, success: success, failure: failure, deferred: deferred) + run(job, queue: queue, success: success, failure: failure, deferred: deferred, dependencies: dependencies) } return } @@ -61,7 +62,7 @@ public enum SyncPushTokensJob: JobExecutor { !UIApplication.shared.isRegisteredForRemoteNotifications || Date().timeIntervalSince(lastPushNotificationSync) >= SyncPushTokensJob.maxFrequency else { - deferred(job) // Don't need to do anything if push notifications are already registered + deferred(job, dependencies) // Don't need to do anything if push notifications are already registered return } @@ -90,7 +91,9 @@ public enum SyncPushTokensJob: JobExecutor { } } } - .ensure(on: queue) { success(job, false) } // We want to complete this job regardless of success or failure + .ensure(on: queue) { + success(job, false, dependencies) // We want to complete this job regardless of success or failure + } .retainUntilComplete() } @@ -107,9 +110,9 @@ public enum SyncPushTokensJob: JobExecutor { SyncPushTokensJob.run( job, queue: DispatchQueue.global(qos: .default), - success: { _, _ in }, - failure: { _, _, _ in }, - deferred: { _ in } + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in } ) } } diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index 391f8005a..af1267fdd 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -171,9 +171,9 @@ public final class BackgroundPoller { MessageReceiveJob.run( job, queue: DispatchQueue.main, - success: { _, _ in seal.fulfill(()) }, - failure: { _, _, _ in seal.fulfill(()) }, - deferred: { _ in seal.fulfill(()) } + success: { _, _, _ in seal.fulfill(()) }, + failure: { _, _, _, _ in seal.fulfill(()) }, + deferred: { _, _ in seal.fulfill(()) } ) return promise diff --git a/SessionMessagingKit/Database/Models/Profile.swift b/SessionMessagingKit/Database/Models/Profile.swift index 38f7c4cd9..0a32ea093 100644 --- a/SessionMessagingKit/Database/Models/Profile.swift +++ b/SessionMessagingKit/Database/Models/Profile.swift @@ -258,10 +258,10 @@ public extension Profile { /// /// **Note:** This method intentionally does **not** save the newly created Profile, /// it will need to be explicitly saved after calling - static func fetchOrCreateCurrentUser() -> Profile { + static func fetchOrCreateCurrentUser(dependencies: Dependencies = Dependencies()) -> Profile { var userPublicKey: String = "" - let exisingProfile: Profile? = Storage.shared.read { db in + let exisingProfile: Profile? = dependencies.storage.read { db in userPublicKey = getUserHexEncodedPublicKey(db) return try Profile.fetchOne(db, id: userPublicKey) diff --git a/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift index a3588e511..14934a4e7 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentDownloadJob.swift @@ -14,9 +14,10 @@ public enum AttachmentDownloadJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { guard let threadId: String = job.threadId, @@ -25,7 +26,7 @@ public enum AttachmentDownloadJob: JobExecutor { let attachment: Attachment = Storage.shared .read({ db in try Attachment.fetchOne(db, id: details.attachmentId) }) else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } @@ -33,7 +34,7 @@ public enum AttachmentDownloadJob: JobExecutor { // an AttachmentDownloadJob to get created for an attachment which has already been // downloaded/uploaded so in those cases just succeed immediately guard attachment.state != .downloaded && attachment.state != .uploaded else { - success(job, false) + success(job, false, dependencies) return } @@ -42,7 +43,7 @@ public enum AttachmentDownloadJob: JobExecutor { // if an attachment ends up stuck in a "downloading" state incorrectly guard attachment.state != .downloading else { let otherCurrentJobAttachmentIds: Set = JobRunner - .defailsForCurrentlyRunningJobs(of: .attachmentDownload) + .detailsForCurrentlyRunningJobs(of: .attachmentDownload) .filter { key, _ in key != job.id } .values .compactMap { data -> String? in @@ -57,7 +58,7 @@ public enum AttachmentDownloadJob: JobExecutor { // then we should update the state of the attachment to be failed to avoid having attachments // appear in an endlessly downloading state if !otherCurrentJobAttachmentIds.contains(attachment.id) { - Storage.shared.write { db in + dependencies.storage.write { db in _ = try Attachment .filter(id: attachment.id) .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.failedDownload)) @@ -70,12 +71,12 @@ public enum AttachmentDownloadJob: JobExecutor { // If there is another current job then just fail this one permanently, otherwise let it // retry (if there are more retry attempts available) and in the next retry it's state should // be 'failedDownload' so we won't get stuck in a loop - failure(job, nil, otherCurrentJobAttachmentIds.contains(attachment.id)) + failure(job, nil, otherCurrentJobAttachmentIds.contains(attachment.id), dependencies) return } // Update to the 'downloading' state (no need to update the 'attachment' instance) - Storage.shared.write { db in + dependencies.storage.write { db in try Attachment .filter(id: attachment.id) .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.downloading)) @@ -141,7 +142,7 @@ public enum AttachmentDownloadJob: JobExecutor { /// /// **Note:** We **MUST** use the `'with()` function here as it will update the /// `isValid` and `duration` values based on the downloaded data and the state - Storage.shared.write { db in + dependencies.storage.write { db in _ = try attachment .with( state: .downloaded, @@ -154,7 +155,7 @@ public enum AttachmentDownloadJob: JobExecutor { .saved(db) } - success(job, false) + success(job, false, dependencies) } .catch(on: queue) { error in OWSFileSystem.deleteFile(temporaryFileUrl.path) @@ -188,14 +189,14 @@ public enum AttachmentDownloadJob: JobExecutor { /// /// **Note:** We **MUST** use the `'with()` function here as it will update the /// `isValid` and `duration` values based on the downloaded data and the state - Storage.shared.write { db in + dependencies.storage.write { db in _ = try Attachment .filter(id: attachment.id) .updateAll(db, Attachment.Columns.state.set(to: targetState)) } /// Trigger the failure and provide the `permanentFailure` value defined above - failure(job, error, permanentFailure) + failure(job, error, permanentFailure, dependencies) } } } diff --git a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift index 88e72891f..eb47948fc 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift @@ -14,16 +14,17 @@ public enum AttachmentUploadJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { guard let threadId: String = job.threadId, let interactionId: Int64 = job.interactionId, let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData), - let (attachment, openGroup): (Attachment, OpenGroup?) = Storage.shared.read({ db in + let (attachment, openGroup): (Attachment, OpenGroup?) = dependencies.storage.read({ db in guard let attachment: Attachment = try Attachment.fetchOne(db, id: details.attachmentId) else { return nil } @@ -31,20 +32,20 @@ public enum AttachmentUploadJob: JobExecutor { return (attachment, try OpenGroup.fetchOne(db, id: threadId)) }) else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } // If the original interaction no longer exists then don't bother uploading the attachment (ie. the // message was deleted before it even got sent) - guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { - failure(job, StorageError.objectNotFound, true) + guard dependencies.storage.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { + failure(job, StorageError.objectNotFound, true, dependencies) return } // If the attachment is still pending download the hold off on running this job guard attachment.state != .pendingDownload && attachment.state != .downloading else { - deferred(job) + deferred(job, dependencies) return } @@ -71,8 +72,8 @@ public enum AttachmentUploadJob: JobExecutor { .map { response -> String in response.id } }, encrypt: (openGroup == nil), - success: { _ in success(job, false) }, - failure: { error in failure(job, error, false) } + success: { _ in success(job, false, dependencies) }, + failure: { error in failure(job, error, false, dependencies) } ) } } diff --git a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift index d294c27b3..d1dc38b27 100644 --- a/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift +++ b/SessionMessagingKit/Jobs/Types/DisappearingMessagesJob.swift @@ -13,15 +13,16 @@ public enum DisappearingMessagesJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // The 'backgroundTask' gets captured and cleared within the 'completion' block let timestampNowMs: TimeInterval = TimeInterval(SnodeAPI.currentOffsetTimestampMs()) var backgroundTask: OWSBackgroundTask? = OWSBackgroundTask(label: #function) - let updatedJob: Job? = Storage.shared.write { db in + let updatedJob: Job? = dependencies.storage.write { db in _ = try Interaction .filter(Interaction.Columns.expiresStartedAtMs != nil) .filter((Interaction.Columns.expiresStartedAtMs + (Interaction.Columns.expiresInSeconds * 1000)) <= timestampNowMs) @@ -35,7 +36,7 @@ public enum DisappearingMessagesJob: JobExecutor { .saved(db) } - success(updatedJob ?? job, false) + success(updatedJob ?? job, false, dependencies) // The 'if' is only there to prevent the "variable never read" warning from showing if backgroundTask != nil { backgroundTask = nil } diff --git a/SessionMessagingKit/Jobs/Types/FailedAttachmentDownloadsJob.swift b/SessionMessagingKit/Jobs/Types/FailedAttachmentDownloadsJob.swift index a2d921eee..7c1473f00 100644 --- a/SessionMessagingKit/Jobs/Types/FailedAttachmentDownloadsJob.swift +++ b/SessionMessagingKit/Jobs/Types/FailedAttachmentDownloadsJob.swift @@ -13,12 +13,13 @@ public enum FailedAttachmentDownloadsJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // Update all 'sending' message states to 'failed' - Storage.shared.write { db in + dependencies.storage.write { db in let changeCount: Int = try Attachment .filter(Attachment.Columns.state == Attachment.State.downloading) .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.failedDownload)) @@ -26,6 +27,6 @@ public enum FailedAttachmentDownloadsJob: JobExecutor { Logger.debug("Marked \(changeCount) attachments as failed") } - success(job, false) + success(job, false, dependencies) } } diff --git a/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift b/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift index bdb53d53a..4b4d5c4d1 100644 --- a/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift +++ b/SessionMessagingKit/Jobs/Types/FailedMessageSendsJob.swift @@ -13,12 +13,13 @@ public enum FailedMessageSendsJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // Update all 'sending' message states to 'failed' - Storage.shared.write { db in + dependencies.storage.write { db in let sendChangeCount: Int = try RecipientState .filter(RecipientState.Columns.state == RecipientState.State.sending) .updateAll(db, RecipientState.Columns.state.set(to: RecipientState.State.failed)) @@ -33,6 +34,6 @@ public enum FailedMessageSendsJob: JobExecutor { SNLog("Marked \(changeCount) message\(changeCount == 1 ? "" : "s") as failed (\(attachmentChangeCount) upload\(attachmentChangeCount == 1 ? "" : "s") cancelled)") } - success(job, false) + success(job, false, dependencies) } } diff --git a/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift b/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift index 512e61bfd..46d776cd3 100644 --- a/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift +++ b/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift @@ -21,9 +21,10 @@ public enum GarbageCollectionJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { /// Determine what types of data we want to collect (if we didn't provide any then assume we want to collect everything) /// @@ -57,7 +58,7 @@ public enum GarbageCollectionJob: JobExecutor { return typesToCollect.asSet() }() - Storage.shared.writeAsync( + dependencies.storage.writeAsync( updates: { db in /// Remove any typing indicators if finalTypesToCollect.contains(.threadTypingIndicators) { @@ -339,7 +340,7 @@ public enum GarbageCollectionJob: JobExecutor { // If we couldn't get the file lists then fail (invalid state and don't want to delete all attachment/profile files) guard let fileInfo: FileInfo = maybeFileInfo else { - failure(job, StorageError.generic, false) + failure(job, StorageError.generic, false, dependencies) return } @@ -414,7 +415,7 @@ public enum GarbageCollectionJob: JobExecutor { // Report a single file deletion as a job failure (even if other content was successfully removed) guard deletionErrors.isEmpty else { - failure(job, (deletionErrors.first ?? StorageError.generic), false) + failure(job, (deletionErrors.first ?? StorageError.generic), false, dependencies) return } @@ -424,7 +425,7 @@ public enum GarbageCollectionJob: JobExecutor { UserDefaults.standard[.lastGarbageCollection] = Date() } - success(job, false) + success(job, false, dependencies) } } ) diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index 907f6af8d..e12265c18 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -13,22 +13,23 @@ public enum MessageReceiveJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { guard let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } var updatedJob: Job = job var leastSevereError: Error? - Storage.shared.write { db in + dependencies.storage.write { db in var remainingMessagesToProcess: [Details.MessageInfo] = [] for messageInfo in details.messages { @@ -86,13 +87,13 @@ public enum MessageReceiveJob: JobExecutor { // Handle the result switch leastSevereError { case let error as MessageReceiverError where !error.isRetryable: - failure(updatedJob, error, true) + failure(updatedJob, error, true, dependencies) case .some(let error): - failure(updatedJob, error, false) + failure(updatedJob, error, false, dependencies) case .none: - success(updatedJob, false) + success(updatedJob, false, dependencies) } } } diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index bae02f89c..050dc1bfa 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -15,15 +15,16 @@ public enum MessageSendJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { guard let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } @@ -36,14 +37,14 @@ public enum MessageSendJob: JobExecutor { let jobId: Int64 = job.id, let interactionId: Int64 = job.interactionId else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } // If the original interaction no longer exists then don't bother sending the message (ie. the // message was deleted before it even got sent) - guard Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { - failure(job, StorageError.objectNotFound, true) + guard dependencies.storage.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { + failure(job, StorageError.objectNotFound, true, dependencies) return } @@ -52,7 +53,7 @@ public enum MessageSendJob: JobExecutor { // // Note: Normal attachments should be sent in a non-durable way but any // attachments for LinkPreviews and Quotes will be processed through this mechanism - let attachmentState: (shouldFail: Bool, shouldDefer: Bool, fileIds: [String])? = Storage.shared.write { db in + let attachmentState: (shouldFail: Bool, shouldDefer: Bool, fileIds: [String])? = dependencies.storage.write { db in let allAttachmentStateInfo: [Attachment.StateInfo] = try Attachment .stateInfo(interactionId: interactionId) .fetchAll(db) @@ -110,7 +111,8 @@ public enum MessageSendJob: JobExecutor { attachmentId: stateInfo.attachmentId ) ), - before: job + before: job, + dependencies: dependencies ) } .forEach { otherJobId, _ in @@ -140,13 +142,13 @@ public enum MessageSendJob: JobExecutor { // Note: If we have gotten to this point then any dependant attachment upload // jobs will have permanently failed so this message send should also do so guard attachmentState?.shouldFail == false else { - failure(job, AttachmentError.notUploaded, true) + failure(job, AttachmentError.notUploaded, true, dependencies) return } // Defer the job if we found incomplete uploads guard attachmentState?.shouldDefer == false else { - deferred(job) + deferred(job, dependencies) return } @@ -161,7 +163,7 @@ public enum MessageSendJob: JobExecutor { details.message.threadId = (details.message.threadId ?? job.threadId) // Perform the actual message sending - Storage.shared.writeAsync { db -> Promise in + dependencies.storage.writeAsync { db -> Promise in try MessageSender.sendImmediate( db, message: details.message, @@ -171,20 +173,20 @@ public enum MessageSendJob: JobExecutor { isSyncMessage: (details.isSyncMessage == true) ) } - .done(on: queue) { _ in success(job, false) } + .done(on: queue) { _ in success(job, false, dependencies) } .catch(on: queue) { error in SNLog("Couldn't send message due to error: \(error).") switch error { case let senderError as MessageSenderError where !senderError.isRetryable: - failure(job, error, true) + failure(job, error, true, dependencies) case OnionRequestAPIError.httpRequestFailedAtDestination(let statusCode, _, _) where statusCode == 429: // Rate limited - failure(job, error, true) + failure(job, error, true, dependencies) case SnodeAPIError.clockOutOfSync: SNLog("\(originalSentTimestamp != nil ? "Permanently Failing" : "Failing") to send \(type(of: details.message)) due to clock out of sync issue.") - failure(job, error, (originalSentTimestamp != nil)) + failure(job, error, (originalSentTimestamp != nil), dependencies) default: SNLog("Failed to send \(type(of: details.message)).") @@ -192,15 +194,15 @@ public enum MessageSendJob: JobExecutor { if details.message is VisibleMessage { guard let interactionId: Int64 = job.interactionId, - Storage.shared.read({ db in try Interaction.exists(db, id: interactionId) }) == true + dependencies.storage.read({ db in try Interaction.exists(db, id: interactionId) }) == true else { // The message has been deleted so permanently fail the job - failure(job, error, true) + failure(job, error, true, dependencies) return } } - failure(job, error, false) + failure(job, error, false, dependencies) } } .retainUntilComplete() diff --git a/SessionMessagingKit/Jobs/Types/NotifyPushServerJob.swift b/SessionMessagingKit/Jobs/Types/NotifyPushServerJob.swift index 63885541a..3be8e20ab 100644 --- a/SessionMessagingKit/Jobs/Types/NotifyPushServerJob.swift +++ b/SessionMessagingKit/Jobs/Types/NotifyPushServerJob.swift @@ -13,15 +13,16 @@ public enum NotifyPushServerJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { guard let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } @@ -32,8 +33,8 @@ public enum NotifyPushServerJob: JobExecutor { maxRetryCount: 4, queue: queue ) - .done(on: queue) { _ in success(job, false) } - .catch(on: queue) { error in failure(job, error, false) } + .done(on: queue) { _ in success(job, false, dependencies) } + .catch(on: queue) { error in failure(job, error, false, dependencies) } .retainUntilComplete() } } diff --git a/SessionMessagingKit/Jobs/Types/RetrieveDefaultOpenGroupRoomsJob.swift b/SessionMessagingKit/Jobs/Types/RetrieveDefaultOpenGroupRoomsJob.swift index 01c244019..87c09ae50 100644 --- a/SessionMessagingKit/Jobs/Types/RetrieveDefaultOpenGroupRoomsJob.swift +++ b/SessionMessagingKit/Jobs/Types/RetrieveDefaultOpenGroupRoomsJob.swift @@ -13,13 +13,14 @@ public enum RetrieveDefaultOpenGroupRoomsJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // Don't run when inactive or not in main app guard (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) else { - deferred(job) // Don't need to do anything if it's not the main app + deferred(job, dependencies) // Don't need to do anything if it's not the main app return } @@ -27,7 +28,7 @@ public enum RetrieveDefaultOpenGroupRoomsJob: JobExecutor { // in the database so we need to create a dummy one to retrieve the default room data let defaultGroupId: String = OpenGroup.idFor(roomToken: "", server: OpenGroupAPI.defaultServer) - Storage.shared.write { db in + dependencies.storage.write { db in guard try OpenGroup.exists(db, id: defaultGroupId) == false else { return } _ = try OpenGroup( @@ -43,8 +44,8 @@ public enum RetrieveDefaultOpenGroupRoomsJob: JobExecutor { } OpenGroupManager.getDefaultRoomsIfNeeded() - .done(on: queue) { _ in success(job, false) } - .catch(on: queue) { error in failure(job, error, false) } + .done(on: queue) { _ in success(job, false, dependencies) } + .catch(on: queue) { error in failure(job, error, false, dependencies) } .retainUntilComplete() } } diff --git a/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift b/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift index bdb684869..0082bd277 100644 --- a/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift +++ b/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift @@ -14,16 +14,17 @@ public enum SendReadReceiptsJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { guard let threadId: String = job.threadId, let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { - failure(job, JobRunnerError.missingRequiredDetails, false) + failure(job, JobRunnerError.missingRequiredDetails, false, dependencies) return } @@ -31,11 +32,11 @@ public enum SendReadReceiptsJob: JobExecutor { // something is marked as read we want to try and run immediately so don't scuedule // another run in this case) guard !details.timestampMsValues.isEmpty else { - success(job, true) + success(job, true, dependencies) return } - Storage.shared + dependencies.storage .writeAsync { db in try MessageSender.sendImmediate( db, @@ -54,7 +55,7 @@ public enum SendReadReceiptsJob: JobExecutor { var shouldFinishCurrentJob: Bool = false let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + minRunFrequency) - let updatedJob: Job? = Storage.shared.write { db in + let updatedJob: Job? = dependencies.storage.write { db in // If another 'sendReadReceipts' job was scheduled then update that one // to run at 'nextRunTimestamp' and make the current job stop if @@ -79,9 +80,9 @@ public enum SendReadReceiptsJob: JobExecutor { .saved(db) } - success(updatedJob ?? job, shouldFinishCurrentJob) + success(updatedJob ?? job, shouldFinishCurrentJob, dependencies) } - .catch(on: queue) { error in failure(job, error, false) } + .catch(on: queue) { error in failure(job, error, false, dependencies) } .retainUntilComplete() } } diff --git a/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift b/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift index 0a79dfde9..ee7d69125 100644 --- a/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift +++ b/SessionMessagingKit/Jobs/Types/UpdateProfilePictureJob.swift @@ -13,13 +13,14 @@ public enum UpdateProfilePictureJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // Don't run when inactive or not in main app guard (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) else { - deferred(job) // Don't need to do anything if it's not the main app + deferred(job, dependencies) // Don't need to do anything if it's not the main app return } @@ -31,18 +32,18 @@ public enum UpdateProfilePictureJob: JobExecutor { // Reset the `nextRunTimestamp` value just in case the last run failed so we don't get stuck // in a loop endlessly deferring the job if let jobId: Int64 = job.id { - Storage.shared.write { db in + dependencies.storage.write { db in try Job .filter(id: jobId) .updateAll(db, Job.Columns.nextRunTimestamp.set(to: 0)) } } - deferred(job) + deferred(job, dependencies) return } // Note: The user defaults flag is updated in ProfileManager - let profile: Profile = Profile.fetchOrCreateCurrentUser() + let profile: Profile = Profile.fetchOrCreateCurrentUser(dependencies: dependencies) let profileFilePath: String? = profile.profilePictureFileName .map { ProfileManager.profileAvatarFilepath(filename: $0) } @@ -58,10 +59,10 @@ public enum UpdateProfilePictureJob: JobExecutor { // issue as it will write to the database and this closure is already called within // another database write queue.async { - success(job, false) + success(job, false, dependencies) } }, - failure: { error in failure(job, error, false) } + failure: { error in failure(job, error, false, dependencies) } ) } } diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index 32bb8cbc5..6c02a3ccb 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -270,12 +270,12 @@ public final class MessageSender { NotifyPushServerJob.run( job, queue: DispatchQueue.global(qos: .default), - success: { _, _ in seal.fulfill(()) }, - failure: { _, _, _ in + success: { _, _, _ in seal.fulfill(()) }, + failure: { _, _, _, _ in // Always fulfill because the notify PN server job isn't critical. seal.fulfill(()) }, - deferred: { _ in + deferred: { _, _ in // Always fulfill because the notify PN server job isn't critical. seal.fulfill(()) } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index 369959742..801968e12 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -284,9 +284,9 @@ public final class ClosedGroupPoller { MessageReceiveJob.run( job, queue: queue, - success: { _, _ in seal.fulfill(()) }, - failure: { _, _, _ in seal.fulfill(()) }, - deferred: { _ in seal.fulfill(()) } + success: { _, _, _ in seal.fulfill(()) }, + failure: { _, _, _, _ in seal.fulfill(()) }, + deferred: { _, _ in seal.fulfill(()) } ) return promise diff --git a/SessionSnodeKit/GetSnodePoolJob.swift b/SessionSnodeKit/GetSnodePoolJob.swift index af0c61d07..610101c80 100644 --- a/SessionSnodeKit/GetSnodePoolJob.swift +++ b/SessionSnodeKit/GetSnodePoolJob.swift @@ -13,14 +13,15 @@ public enum GetSnodePoolJob: JobExecutor { public static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies = Dependencies() ) { // If the user doesn't exist then don't do anything (when the user registers we run this // job directly) guard Identity.userExists() else { - deferred(job) + deferred(job, dependencies) return } @@ -30,13 +31,13 @@ public enum GetSnodePoolJob: JobExecutor { // wait if we already have a potentially valid snode pool guard !SnodeAPI.hasCachedSnodesInclusingExpired() else { SnodeAPI.getSnodePool().retainUntilComplete() - success(job, false) + success(job, false, dependencies) return } SnodeAPI.getSnodePool() - .done(on: queue) { _ in success(job, false) } - .catch(on: queue) { error in failure(job, error, false) } + .done(on: queue) { _ in success(job, false, dependencies) } + .catch(on: queue) { error in failure(job, error, false, dependencies) } .retainUntilComplete() } @@ -44,9 +45,9 @@ public enum GetSnodePoolJob: JobExecutor { GetSnodePoolJob.run( Job(variant: .getSnodePool), queue: DispatchQueue.global(qos: .background), - success: { _, _ in }, - failure: { _, _, _ in }, - deferred: { _ in } + success: { _, _, _ in }, + failure: { _, _, _, _ in }, + deferred: { _, _ in } ) } } diff --git a/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift b/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift index b8e849c74..e90b90909 100644 --- a/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift +++ b/SessionUtilitiesKit/Database/Utilities/Database+Utilities.swift @@ -36,4 +36,94 @@ public extension Database { sqlite3_interrupt(sqliteConnection) } + + /// This is a custom implementation of the `afterNextTransaction` method which executes the closures within their own + /// transactions to allow for nesting of 'afterNextTransaction' actions + /// + /// **Note:** GRDB doesn't notify read-only transactions to transaction observers + func afterNextTransactionNested( + onCommit: @escaping (Database) -> Void, + onRollback: @escaping (Database) -> Void = { _ in } + ) { + afterNextTransactionNestedOnce( + dedupeId: UUID().uuidString, + onCommit: onCommit, + onRollback: onRollback + ) + } + + func afterNextTransactionNestedOnce( + dedupeId: String, + onCommit: @escaping (Database) -> Void, + onRollback: @escaping (Database) -> Void = { _ in } + ) { + // Only allow a single observer per `dedupeId` per transaction, this allows us to + // schedule an action to run at most once per transaction (eg. auto-scheduling a ConfigSyncJob + // when receiving messages) + guard !TransactionHandler.registeredHandlers.wrappedValue.contains(dedupeId) else { + return + } + + add( + transactionObserver: TransactionHandler( + identifier: dedupeId, + onCommit: onCommit, + onRollback: onRollback + ), + extent: .nextTransaction + ) + } } + +fileprivate class TransactionHandler: TransactionObserver { + static var registeredHandlers: Atomic> = Atomic([]) + + let identifier: String + let onCommit: (Database) -> Void + let onRollback: (Database) -> Void + + init( + identifier: String, + onCommit: @escaping (Database) -> Void, + onRollback: @escaping (Database) -> Void + ) { + self.identifier = identifier + self.onCommit = onCommit + self.onRollback = onRollback + + TransactionHandler.registeredHandlers.mutate { $0.insert(identifier) } + } + + // Ignore changes + func observes(eventsOfKind eventKind: DatabaseEventKind) -> Bool { false } + func databaseDidChange(with event: DatabaseEvent) { } + + func databaseDidCommit(_ db: Database) { + TransactionHandler.registeredHandlers.mutate { $0.remove(identifier) } + + do { + try db.inTransaction { + onCommit(db) + return .commit + } + } + catch { + SNLog("[Database] afterNextTransactionNested onCommit failed") + } + } + + func databaseDidRollback(_ db: Database) { + TransactionHandler.registeredHandlers.mutate { $0.remove(identifier) } + + do { + try db.inTransaction { + onRollback(db) + return .commit + } + } + catch { + SNLog("[Database] afterNextTransactionNested onRollback failed") + } + } +} + diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 7d604f0ba..fafb086d2 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -2,7 +2,6 @@ import Foundation import GRDB -import SignalCoreKit public protocol JobExecutor { /// The maximum number of times the job can fail before it fails permanently @@ -29,9 +28,10 @@ public protocol JobExecutor { static func run( _ job: Job, queue: DispatchQueue, - success: @escaping (Job, Bool) -> (), - failure: @escaping (Job, Error?, Bool) -> (), - deferred: @escaping (Job) -> () + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies ) } @@ -43,165 +43,103 @@ public final class JobRunner { case notFound } - private static let blockingQueue: Atomic = Atomic( - JobQueue( - type: .blocking, - qos: .default, - jobVariants: [], - onQueueDrained: { - // Once all blocking jobs have been completed we want to start running - // the remaining job queues - queues.wrappedValue.forEach { _, queue in queue.start() } - } - ) - ) - private static let queues: Atomic<[Job.Variant: JobQueue]> = { + // MARK: - Variables + + private let blockingQueue: Atomic + private let queues: Atomic<[Job.Variant: JobQueue]> + + internal var perSessionJobsCompleted: Atomic> = Atomic([]) + internal var hasCompletedInitialBecomeActive: Atomic = Atomic(false) + internal var shutdownBackgroundTask: Atomic = Atomic(nil) + internal var canStartQueues: Atomic = Atomic(false) + + // MARK: - Initialization + + init(dependencies: Dependencies = Dependencies()) { var jobVariants: Set = Job.Variant.allCases.asSet() - let messageSendQueue: JobQueue = JobQueue( - type: .messageSend, - executionType: .concurrent, // Allow as many jobs to run at once as supported by the device - qos: .default, - jobVariants: [ - jobVariants.remove(.attachmentUpload), - jobVariants.remove(.messageSend), - jobVariants.remove(.notifyPushServer), - jobVariants.remove(.sendReadReceipts) - ].compactMap { $0 } + self.blockingQueue = Atomic( + JobQueue( + type: .blocking, + qos: .default, + jobVariants: [], + onQueueDrained: { + // Once all blocking jobs have been completed we want to start running + // the remaining job queues + JobRunner.startNonBlockingQueues(dependencies: dependencies) + } + ) ) - let messageReceiveQueue: JobQueue = JobQueue( - type: .messageReceive, - // Explicitly serial as executing concurrently means message receives getting processed at - // different speeds which can result in: - // • Small batches of messages appearing in the UI before larger batches - // • Closed group messages encrypted with updated keys could start parsing before it's key - // update message has been processed (ie. guaranteed to fail) - executionType: .serial, - qos: .default, - jobVariants: [ - jobVariants.remove(.messageReceive) - ].compactMap { $0 } - ) - let attachmentDownloadQueue: JobQueue = JobQueue( - type: .attachmentDownload, - qos: .utility, - jobVariants: [ - jobVariants.remove(.attachmentDownload) - ].compactMap { $0 } - ) - let generalQueue: JobQueue = JobQueue( - type: .general(number: 0), - qos: .utility, - jobVariants: Array(jobVariants) - ) - - return Atomic([ - messageSendQueue, - messageReceiveQueue, - attachmentDownloadQueue, - generalQueue + self.queues = Atomic([ + // MARK: -- Message Send Queue + + JobQueue( + type: .messageSend, + executionType: .concurrent, // Allow as many jobs to run at once as supported by the device + qos: .default, + jobVariants: [ + jobVariants.remove(.attachmentUpload), + jobVariants.remove(.messageSend), + jobVariants.remove(.notifyPushServer), + jobVariants.remove(.sendReadReceipts) + ].compactMap { $0 } + ), + + // MARK: -- Message Receive Queue + + JobQueue( + type: .messageReceive, + // Explicitly serial as executing concurrently means message receives getting processed at + // different speeds which can result in: + // • Small batches of messages appearing in the UI before larger batches + // • Closed group messages encrypted with updated keys could start parsing before it's key + // update message has been processed (ie. guaranteed to fail) + executionType: .serial, + qos: .default, + jobVariants: [ + jobVariants.remove(.messageReceive) + ].compactMap { $0 } + ), + + // MARK: -- Attachment Download Queue + + JobQueue( + type: .attachmentDownload, + qos: .utility, + jobVariants: [ + jobVariants.remove(.attachmentDownload) + ].compactMap { $0 } + ), + + // MARK: -- General Queue + + JobQueue( + type: .general(number: 0), + qos: .utility, + jobVariants: Array(jobVariants) + ) ].reduce(into: [:]) { prev, next in next.jobVariants.forEach { variant in prev[variant] = next } }) - }() - - internal static var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) - fileprivate static var perSessionJobsCompleted: Atomic> = Atomic([]) - private static var hasCompletedInitialBecomeActive: Atomic = Atomic(false) - private static var shutdownBackgroundTask: Atomic = Atomic(nil) - fileprivate static var canStartQueues: Atomic = Atomic(false) + } // MARK: - Configuration - public static func add(executor: JobExecutor.Type, for variant: Job.Variant) { - executorMap.mutate { $0[variant] = executor } + internal func add(executor: JobExecutor.Type, for variant: Job.Variant) { + queues.wrappedValue[variant]?.addExecutor(executor, for: variant) } // MARK: - Execution - /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start - /// the JobRunner - /// - /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` - /// is in the future then the job won't be started - public static func add(_ db: Database, job: Job?, canStartJob: Bool = true) { - // Store the job into the database (getting an id for it) - guard let updatedJob: Job = try? job?.inserted(db) else { - SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") - return - } - guard !canStartJob || updatedJob.id != nil else { - SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") - return - } - - queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } - - // Don't start the queue if the job can't be started - guard canStartJob else { return } - - // Start the job runner if needed - db.afterNextTransaction { _ in - queues.wrappedValue[updatedJob.variant]?.start() - } - } - - /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start - /// the JobRunner - /// - /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` - /// is in the future then the job won't be started - public static func upsert(_ db: Database, job: Job?, canStartJob: Bool = true) { - guard let job: Job = job else { return } // Ignore null jobs - guard job.id != nil else { - add(db, job: job, canStartJob: canStartJob) - return - } - - queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) - - // Don't start the queue if the job can't be started - guard canStartJob else { return } - - // Start the job runner if needed - db.afterNextTransaction { _ in - queues.wrappedValue[job.variant]?.start() - } - } - - @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? { - switch job?.behaviour { - case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: - SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")") - return nil - - default: break - } - - // Store the job into the database (getting an id for it) - guard let updatedJob: Job = try? job?.inserted(db) else { - SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") - return nil - } - guard let jobId: Int64 = updatedJob.id else { - SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") - return nil - } - - queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) - - return (jobId, updatedJob) - } - - public static func appDidFinishLaunching() { + internal func appDidFinishLaunching(dependencies: Dependencies) { // Flag that the JobRunner can start it's queues - JobRunner.canStartQueues.mutate { $0 = true } + canStartQueues.mutate { $0 = true } // Note: 'appDidBecomeActive' will run on first launch anyway so we can // leave those jobs out and can wait until then to start the JobRunner - let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = Storage.shared + let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = dependencies.storage .read { db in let blockingJobs: [Job] = try Job .filter( @@ -231,7 +169,11 @@ public final class JobRunner { guard !jobsToRun.blocking.isEmpty || !jobsToRun.nonBlocking.isEmpty else { return } // Add and start any blocking jobs - blockingQueue.wrappedValue?.appDidFinishLaunching(with: jobsToRun.blocking, canStart: true) + blockingQueue.wrappedValue?.appDidFinishLaunching( + with: jobsToRun.blocking, + canStart: true, + dependencies: dependencies + ) // Add any non-blocking jobs (we don't start these incase there are blocking "on active" // jobs as well) @@ -239,13 +181,13 @@ public final class JobRunner { let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue jobsByVariant.forEach { variant, jobs in - jobQueues[variant]?.appDidFinishLaunching(with: jobs, canStart: false) + jobQueues[variant]?.appDidFinishLaunching(with: jobs, canStart: false, dependencies: dependencies) } } - public static func appDidBecomeActive() { + internal func appDidBecomeActive(dependencies: Dependencies) { // Flag that the JobRunner can start it's queues - JobRunner.canStartQueues.mutate { $0 = true } + canStartQueues.mutate { $0 = true } // If we have a running "sutdownBackgroundTask" then we want to cancel it as otherwise it // can result in the database being suspended and us being unable to interact with it at all @@ -255,8 +197,8 @@ public final class JobRunner { } // Retrieve any jobs which should run when becoming active - let hasCompletedInitialBecomeActive: Bool = JobRunner.hasCompletedInitialBecomeActive.wrappedValue - let jobsToRun: [Job] = Storage.shared + let hasCompletedInitialBecomeActive: Bool = self.hasCompletedInitialBecomeActive.wrappedValue + let jobsToRun: [Job] = dependencies.storage .read { db in return try Job .filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive) @@ -272,7 +214,7 @@ public final class JobRunner { guard !jobsToRun.isEmpty else { if !blockingQueueIsRunning { - jobQueues.forEach { _, queue in queue.start() } + jobQueues.forEach { _, queue in queue.start(dependencies: dependencies) } } return } @@ -283,23 +225,104 @@ public final class JobRunner { jobQueues.forEach { variant, queue in queue.appDidBecomeActive( with: (jobsByVariant[variant] ?? []), - canStart: !blockingQueueIsRunning + canStart: !blockingQueueIsRunning, + dependencies: dependencies ) } - JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true } + self.hasCompletedInitialBecomeActive.mutate { $0 = true } } - /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run - /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their - /// failure - they _should_ be picked up again the next time the app is launched) - public static func stopAndClearPendingJobs( + internal func add( + _ db: Database, + job: Job?, + canStartJob: Bool, + dependencies: Dependencies + ) { + // Store the job into the database (getting an id for it) + guard let updatedJob: Job = try? job?.inserted(db) else { + SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") + return + } + guard !canStartJob || updatedJob.id != nil else { + SNLog("[JobRunner] Not starting \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") + return + } + + queues.mutate { + $0[updatedJob.variant]? + .add(updatedJob, canStartJob: canStartJob, dependencies: dependencies) + } + + // Don't start the queue if the job can't be started + guard canStartJob else { return } + + // Start the job runner if needed + db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(updatedJob.variant)") { [weak self] _ in + self?.queues.wrappedValue[updatedJob.variant]?.start(dependencies: dependencies) + } + } + + internal func upsert( + _ db: Database, + job: Job?, + canStartJob: Bool, + dependencies: Dependencies + ) { + guard let job: Job = job else { return } // Ignore null jobs + guard job.id != nil else { + add(db, job: job, canStartJob: canStartJob, dependencies: dependencies) + return + } + + queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob, dependencies: dependencies) + + // Don't start the queue if the job can't be started + guard canStartJob else { return } + + // Start the job runner if needed + db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(job.variant)") { [weak self] _ in + self?.queues.wrappedValue[job.variant]?.start(dependencies: dependencies) + } + } + + @discardableResult internal func insert( + _ db: Database, + job: Job?, + before otherJob: Job, + dependencies: Dependencies + ) -> (Int64, Job)? { + switch job?.behaviour { + case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: + SNLog("[JobRunner] Attempted to insert \(job.map { "\($0.variant)" } ?? "unknown") job before the current one even though it's behaviour is \(job.map { "\($0.behaviour)" } ?? "unknown")") + return nil + + default: break + } + + // Store the job into the database (getting an id for it) + guard let updatedJob: Job = try? job?.inserted(db) else { + SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job") + return nil + } + guard let jobId: Int64 = updatedJob.id else { + SNLog("[JobRunner] Unable to add \(job.map { "\($0.variant)" } ?? "unknown") job due to missing id") + return nil + } + + queues.wrappedValue[updatedJob.variant]? + .insert(updatedJob, before: otherJob, dependencies: dependencies) + + return (jobId, updatedJob) + } + + internal func stopAndClearPendingJobs( exceptForVariant: Job.Variant? = nil, onComplete: (() -> ())? = nil ) { // Inform the JobRunner that it can't start any queues (this is to prevent queues from // rescheduling themselves while in the background, when the app restarts or becomes active // the JobRunenr will update this flag) - JobRunner.canStartQueues.mutate { $0 = false } + canStartQueues.mutate { $0 = false } // Stop all queues except for the one containing the `exceptForVariant` queues.wrappedValue @@ -341,27 +364,27 @@ public final class JobRunner { } // Add a callback to be triggered once the queue is drained - queue.onQueueDrained = { [weak queue] in + queue.onQueueDrained = { [weak self, weak queue] in oldQueueDrained?() queue?.onQueueDrained = oldQueueDrained onComplete?() - shutdownBackgroundTask.mutate { $0 = nil } + self?.shutdownBackgroundTask.mutate { $0 = nil } } } - public static func isCurrentlyRunning(_ job: Job?) -> Bool { + internal func isCurrentlyRunning(_ job: Job?) -> Bool { guard let job: Job = job, let jobId: Int64 = job.id else { return false } return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true) } - public static func defailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { + internal func detailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { return (queues.wrappedValue[variant]?.detailsForAllCurrentlyRunningJobs()) .defaulting(to: [:]) } - public static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { + internal func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else { callback(.notFound) return @@ -370,14 +393,14 @@ public final class JobRunner { queue.afterCurrentlyRunningJob(jobId, callback: callback) } - public static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { + internal func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false } return targetQueue.hasPendingOrRunningJob(with: detailsData) } - public static func removePendingJob(_ job: Job?) { + internal func removePendingJob(_ job: Job?) { guard let job: Job = job, let jobId: Int64 = job.id else { return } queues.wrappedValue[job.variant]?.removePendingJob(jobId) @@ -398,6 +421,97 @@ public final class JobRunner { } } +// MARK: - JobRunner Singleton + +public extension JobRunner { + private static let instance: JobRunner = JobRunner() + + // MARK: - Static Access + + static func add(executor: JobExecutor.Type, for variant: Job.Variant) { + instance.add(executor: executor, for: variant) + } + + static func appDidFinishLaunching(dependencies: Dependencies = Dependencies()) { + instance.appDidFinishLaunching(dependencies: dependencies) + } + + static func appDidBecomeActive(dependencies: Dependencies = Dependencies()) { + instance.appDidBecomeActive(dependencies: dependencies) + } + + /// Add a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start + /// the JobRunner + /// + /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` + /// is in the future then the job won't be started + static func add( + _ db: Database, + job: Job?, + canStartJob: Bool = true, + dependencies: Dependencies = Dependencies() + ) { instance.add(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } + + /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start + /// the JobRunner + /// + /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` + /// is in the future then the job won't be started + static func upsert( + _ db: Database, + job: Job?, + canStartJob: Bool = true, + dependencies: Dependencies = Dependencies() + ) { instance.upsert(db, job: job, canStartJob: canStartJob, dependencies: dependencies) } + + @discardableResult static func insert( + _ db: Database, + job: Job?, + before otherJob: Job, + dependencies: Dependencies = Dependencies() + ) -> (Int64, Job)? { instance.insert(db, job: job, before: otherJob, dependencies: dependencies) } + + /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run + /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their + /// failure - they _should_ be picked up again the next time the app is launched) + static func stopAndClearPendingJobs( + exceptForVariant: Job.Variant? = nil, + onComplete: (() -> ())? = nil + ) { instance.stopAndClearPendingJobs(exceptForVariant: exceptForVariant, onComplete: onComplete) } + + static func isCurrentlyRunning(_ job: Job?) -> Bool { + return instance.isCurrentlyRunning(job) + } + + static func detailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { + return instance.detailsForCurrentlyRunningJobs(of: variant) + } + + static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) { + instance.afterCurrentlyRunningJob(job, callback: callback) + } + + static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { + return instance.hasPendingOrRunningJob(with: variant, details: details) + } + + static func removePendingJob(_ job: Job?) { + instance.removePendingJob(job) + } + + // MARK: - Internal Static Access + + fileprivate static func canStart(queue: JobQueue) -> Bool { + return instance.canStartQueues.wrappedValue + } + + fileprivate static func startNonBlockingQueues(dependencies: Dependencies) { + instance.queues.wrappedValue.forEach { _, queue in + queue.start(dependencies: dependencies) + } + } +} + // MARK: - JobQueue private final class JobQueue { @@ -433,7 +547,11 @@ private final class JobQueue { private var timer: Timer? fileprivate var fireTimestamp: TimeInterval = 0 - static func create(queue: JobQueue, timestamp: TimeInterval) -> Trigger? { + static func create( + queue: JobQueue, + timestamp: TimeInterval, + dependencies: Dependencies + ) -> Trigger? { /// Setup the trigger (wait at least 1 second before triggering) /// /// **Note:** We use the `Timer.scheduledTimerOnMainThread` method because running a timer @@ -445,7 +563,7 @@ private final class JobQueue { withTimeInterval: trigger.fireTimestamp, repeats: false, block: { [weak queue] _ in - queue?.start() + queue?.start(dependencies: dependencies) } ) @@ -485,6 +603,7 @@ private final class JobQueue { return result }() + private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:]) private var nextTrigger: Atomic = Atomic(nil) fileprivate var isRunning: Atomic = Atomic(false) private var queue: Atomic<[Job]> = Atomic([]) @@ -512,9 +631,15 @@ private final class JobQueue { self.onQueueDrained = onQueueDrained } + // MARK: - Configuration + + fileprivate func addExecutor(_ executor: JobExecutor.Type, for variant: Job.Variant) { + executorMap.mutate { $0[variant] = executor } + } + // MARK: - Execution - fileprivate func add(_ job: Job, canStartJob: Bool = true) { + fileprivate func add(_ job: Job, canStartJob: Bool = true, dependencies: Dependencies) { // Check if the job should be added to the queue guard canStartJob, @@ -534,7 +659,7 @@ private final class JobQueue { /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started - fileprivate func upsert(_ job: Job, canStartJob: Bool = true) { + fileprivate func upsert(_ job: Job, canStartJob: Bool = true, dependencies: Dependencies) { guard let jobId: Int64 = job.id else { SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") return @@ -557,10 +682,10 @@ private final class JobQueue { // If we didn't update an existing job then we need to add it to the queue guard !didUpdateExistingJob else { return } - add(job, canStartJob: canStartJob) + add(job, canStartJob: canStartJob, dependencies: dependencies) } - fileprivate func insert(_ job: Job, before otherJob: Job) { + fileprivate func insert(_ job: Job, before otherJob: Job, dependencies: Dependencies) { guard job.id != nil else { SNLog("[JobRunner] Prevented attempt to insert \(job.variant) job without id to queue") return @@ -580,16 +705,24 @@ private final class JobQueue { } } - fileprivate func appDidFinishLaunching(with jobs: [Job], canStart: Bool) { + fileprivate func appDidFinishLaunching( + with jobs: [Job], + canStart: Bool, + dependencies: Dependencies + ) { queue.mutate { $0.append(contentsOf: jobs) } // Start the job runner if needed if canStart && !isRunning.wrappedValue { - start() + start(dependencies: dependencies) } } - fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { + fileprivate func appDidBecomeActive( + with jobs: [Job], + canStart: Bool, + dependencies: Dependencies + ) { queue.mutate { queue in // Avoid re-adding jobs to the queue that are already in it (this can // happen if the user sends the app to the background before the 'onActive' @@ -602,7 +735,7 @@ private final class JobQueue { // Start the job runner if needed if canStart && !isRunning.wrappedValue { - start() + start(dependencies: dependencies) } } @@ -639,17 +772,24 @@ private final class JobQueue { // MARK: - Job Running - fileprivate func start(force: Bool = false) { + fileprivate func start( + force: Bool = false, + dependencies: Dependencies + ) { // We only want the JobRunner to run in the main app - guard CurrentAppContext().isMainApp else { return } - guard JobRunner.canStartQueues.wrappedValue else { return } + guard + HasAppContext() && + CurrentAppContext().isMainApp && + !CurrentAppContext().isRunningTests && + JobRunner.canStart(queue: self) + else { return } guard force || !isRunning.wrappedValue else { return } // The JobRunner runs synchronously we need to ensure this doesn't start // on the main thread (if it is on the main thread then swap to a different thread) guard DispatchQueue.getSpecific(key: queueKey) == queueContext else { internalQueue.async { [weak self] in - self?.start() + self?.start(dependencies: dependencies) } return } @@ -665,7 +805,7 @@ private final class JobQueue { // Get any pending jobs let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() - let jobsToRun: [Job] = Storage.shared.read { db in + let jobsToRun: [Job] = dependencies.storage.read { db in try Job .filterPendingJobs( variants: jobVariants, @@ -691,7 +831,7 @@ private final class JobQueue { guard jobCount > 0 else { if jobIdsAlreadyRunning.isEmpty { isRunning.mutate { $0 = false } - scheduleNextSoonestJob() + scheduleNextSoonestJob(dependencies: dependencies) } return } @@ -700,7 +840,7 @@ private final class JobQueue { if !wasAlreadyRunning { SNLog("[JobRunner] Starting \(queueContext) with (\(jobCount) job\(jobCount != 1 ? "s" : ""))") } - runNextJob() + runNextJob(dependencies: dependencies) } fileprivate func stopAndClearPendingJobs() { @@ -709,14 +849,14 @@ private final class JobQueue { deferLoopTracker.mutate { $0 = [:] } } - private func runNextJob() { + private func runNextJob(dependencies: Dependencies) { // Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job) guard isRunning.wrappedValue else { return } // Ensure this is running on the correct queue guard DispatchQueue.getSpecific(key: queueKey) == queueContext else { internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } return } @@ -728,38 +868,58 @@ private final class JobQueue { // Always attempt to schedule the next soonest job (otherwise if enough jobs get started in rapid // succession then pending/failed jobs in the database may never get re-started in a concurrent queue) - scheduleNextSoonestJob() + scheduleNextSoonestJob(dependencies: dependencies) return } - guard let jobExecutor: JobExecutor.Type = JobRunner.executorMap.wrappedValue[nextJob.variant] else { + guard let jobExecutor: JobExecutor.Type = executorMap.wrappedValue[nextJob.variant] else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing executor") - handleJobFailed(nextJob, error: JobRunnerError.executorMissing, permanentFailure: true) + handleJobFailed( + nextJob, + error: JobRunnerError.executorMissing, + permanentFailure: true, + dependencies: dependencies + ) return } guard !jobExecutor.requiresThreadId || nextJob.threadId != nil else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required threadId") - handleJobFailed(nextJob, error: JobRunnerError.requiredThreadIdMissing, permanentFailure: true) + handleJobFailed( + nextJob, + error: JobRunnerError.requiredThreadIdMissing, + permanentFailure: true, + dependencies: dependencies + ) return } guard !jobExecutor.requiresInteractionId || nextJob.interactionId != nil else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing required interactionId") - handleJobFailed(nextJob, error: JobRunnerError.requiredInteractionIdMissing, permanentFailure: true) + handleJobFailed( + nextJob, + error: JobRunnerError.requiredInteractionIdMissing, + permanentFailure: true, + dependencies: dependencies + ) return } guard nextJob.id != nil else { SNLog("[JobRunner] \(queueContext) Unable to run \(nextJob.variant) job due to missing id") - handleJobFailed(nextJob, error: JobRunnerError.jobIdMissing, permanentFailure: false) + handleJobFailed( + nextJob, + error: JobRunnerError.jobIdMissing, + permanentFailure: false, + dependencies: dependencies + ) return } // If the 'nextRunTimestamp' for the job is in the future then don't run it yet guard nextJob.nextRunTimestamp <= Date().timeIntervalSince1970 else { - handleJobDeferred(nextJob) + handleJobDeferred(nextJob, dependencies: dependencies) return } // Check if the next job has any dependencies - let dependencyInfo: (expectedCount: Int, jobs: [Job]) = Storage.shared.read { db in + let dependencyInfo: (expectedCount: Int, jobs: [Job]) = dependencies.storage.read { db in let numExpectedDependencies: Int = try JobDependencies .filter(JobDependencies.Columns.jobId == nextJob.id) .fetchCount(db) @@ -771,7 +931,12 @@ private final class JobQueue { guard dependencyInfo.jobs.count == dependencyInfo.expectedCount else { SNLog("[JobRunner] \(queueContext) found job with missing dependencies, removing the job") - handleJobFailed(nextJob, error: JobRunnerError.missingDependencies, permanentFailure: true) + handleJobFailed( + nextJob, + error: JobRunnerError.missingDependencies, + permanentFailure: true, + dependencies: dependencies + ) return } guard dependencyInfo.jobs.isEmpty else { @@ -792,7 +957,7 @@ private final class JobQueue { ) queue.append(nextJob) } - handleJobDeferred(nextJob) + handleJobDeferred(nextJob, dependencies: dependencies) return } @@ -810,7 +975,7 @@ private final class JobQueue { } } - handleJobDeferred(nextJob) + handleJobDeferred(nextJob, dependencies: dependencies) return } @@ -831,26 +996,45 @@ private final class JobQueue { detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") + /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to + /// the queue which means an odd situation can occasionally occur where the `finished` event can actually run before the `output` + /// event - this can result in unexpected behaviours (for more information see https://github.com/groue/GRDB.swift/issues/1334) + /// + /// Due to this if a job is meant to run on a concurrent queue then we actually want to create a temporary serial queue just for the execution + /// of that job + let targetQueue: DispatchQueue = { + guard executionType == .concurrent else { return internalQueue } + + return DispatchQueue( + label: "\(self.queueContext)-serial", + qos: self.qosClass, + attributes: [], + autoreleaseFrequency: .inherit, + target: nil + ) + }() + jobExecutor.run( nextJob, - queue: internalQueue, + queue: targetQueue, success: handleJobSucceeded, failure: handleJobFailed, - deferred: handleJobDeferred + deferred: handleJobDeferred, + dependencies: dependencies ) // If this queue executes concurrently and there are still jobs remaining then immediately attempt // to start the next job if executionType == .concurrent && numJobsRemaining > 0 { internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } } } - private func scheduleNextSoonestJob() { + private func scheduleNextSoonestJob(dependencies: Dependencies) { let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue - let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in + let nextJobTimestamp: TimeInterval? = dependencies.storage.read { db in try Job .filterPendingJobs( variants: jobVariants, @@ -865,7 +1049,7 @@ private final class JobQueue { // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // the 'onQueueDrained' callback and stop - guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else { + guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStart(queue: self) else { if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { self.onQueueDrained?() } @@ -889,7 +1073,7 @@ private final class JobQueue { // queue (for concurrent queues we want to force them to load in pending jobs and add // them to the queue regardless of whether the queue is already running) internalQueue.async { [weak self] in - self?.start(force: (self?.executionType == .concurrent)) + self?.start(force: (self?.executionType == .concurrent), dependencies: dependencies) } return } @@ -901,17 +1085,21 @@ private final class JobQueue { SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")") nextTrigger.mutate { trigger in trigger?.invalidate() // Need to invalidate the old trigger to prevent a memory leak - trigger = Trigger.create(queue: self, timestamp: nextJobTimestamp) + trigger = Trigger.create(queue: self, timestamp: nextJobTimestamp, dependencies: dependencies) } } // MARK: - Handling Results /// This function is called when a job succeeds - private func handleJobSucceeded(_ job: Job, shouldStop: Bool) { + private func handleJobSucceeded( + _ job: Job, + shouldStop: Bool, + dependencies: Dependencies + ) { switch job.behaviour { case .runOnce, .runOnceNextLaunch: - Storage.shared.write { db in + dependencies.storage.write { db in // First remove any JobDependencies requiring this job to be completed (if // we don't then the dependant jobs will automatically be deleted) _ = try JobDependencies @@ -922,7 +1110,7 @@ private final class JobQueue { } case .recurring where shouldStop == true: - Storage.shared.write { db in + dependencies.storage.write { db in // First remove any JobDependencies requiring this job to be completed (if // we don't then the dependant jobs will automatically be deleted) _ = try JobDependencies @@ -938,7 +1126,7 @@ private final class JobQueue { case .recurring where job.nextRunTimestamp <= Date().timeIntervalSince1970: guard let jobId: Int64 = job.id else { break } - Storage.shared.write { db in + dependencies.storage.write { db in _ = try Job .filter(id: jobId) .updateAll( @@ -958,7 +1146,7 @@ private final class JobQueue { job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude else { break } - Storage.shared.write { db in + dependencies.storage.write { db in _ = try Job .filter(id: jobId) .updateAll( @@ -974,7 +1162,7 @@ private final class JobQueue { // For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other // dependencies they will be removed again when they try to execute) if executionType == .concurrent { - let dependantJobs: [Job] = Storage.shared + let dependantJobs: [Job] = dependencies.storage .read { db in try job.dependantJobs.fetchAll(db) } .defaulting(to: []) let dependantJobIds: [Int64] = dependantJobs @@ -997,19 +1185,24 @@ private final class JobQueue { // Perform job cleanup and start the next job performCleanUp(for: job, result: .succeeded) internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } } /// This function is called when a job fails, if it's wasn't a permanent failure then the 'failureCount' for the job will be incremented and it'll /// be re-run after a retry interval has passed - private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) { + private func handleJobFailed( + _ job: Job, + error: Error?, + permanentFailure: Bool, + dependencies: Dependencies + ) { guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else { SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled") performCleanUp(for: job, result: .failed) internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } return } @@ -1040,13 +1233,13 @@ private final class JobQueue { } internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } return } // Get the max failure count for the job (a value of '-1' means it will retry indefinitely) - let maxFailureCount: Int = (JobRunner.executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0) + let maxFailureCount: Int = (executorMap.wrappedValue[job.variant]?.maxFailureCount ?? 0) let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) Storage.shared.write { db in @@ -1116,13 +1309,16 @@ private final class JobQueue { performCleanUp(for: job, result: .failed) internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } } /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant /// on other jobs, and it should automatically manage those dependencies) - private func handleJobDeferred(_ job: Job) { + private func handleJobDeferred( + _ job: Job, + dependencies: Dependencies + ) { var stuckInDeferLoop: Bool = false deferLoopTracker.mutate { @@ -1160,13 +1356,18 @@ private final class JobQueue { // more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds) guard !stuckInDeferLoop else { deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) } - handleJobFailed(job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false) + handleJobFailed( + job, + error: JobRunnerError.possibleDeferralLoop, + permanentFailure: false, + dependencies: dependencies + ) return } performCleanUp(for: job, result: .deferred) internalQueue.async { [weak self] in - self?.runNextJob() + self?.runNextJob(dependencies: dependencies) } } diff --git a/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift b/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift new file mode 100644 index 000000000..b6dfcc2d7 --- /dev/null +++ b/SessionUtilitiesKitTests/JobRunner/JobRunnerSpec.swift @@ -0,0 +1,68 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB + +import Quick +import Nimble + +@testable import SessionUtilitiesKit + +class JobRunnerSpec: QuickSpec { + public enum TestSuccessfulJob: JobExecutor { + static let maxFailureCount: Int = 0 + static let requiresThreadId: Bool = false + static let requiresInteractionId: Bool = false + + static func run( + _ job: Job, + queue: DispatchQueue, + success: @escaping (Job, Bool, Dependencies) -> (), + failure: @escaping (Job, Error?, Bool, Dependencies) -> (), + deferred: @escaping (Job, Dependencies) -> (), + dependencies: Dependencies + ) { + success(job, true, dependencies) + } + } + + // MARK: - Spec + + override func spec() { + var jobRunner: JobRunner! + var mockStorage: Storage! + var dependencies: Dependencies! + + // MARK: - JobRunner + + describe("a JobRunner") { + beforeEach { + mockStorage = Storage( + customWriter: try! DatabaseQueue(), + customMigrations: [ + SNUtilitiesKit.migrations() + ] + ) + dependencies = Dependencies( + storage: mockStorage, + date: Date(timeIntervalSince1970: 1234567890) + ) + + jobRunner = JobRunner() + } + + afterEach { + jobRunner = nil + mockStorage = nil + dependencies = nil + } + + context("when configuring") { + it("adds an executor correctly") { + // TODO: Test this + jobRunner.add(executor: TestSuccessfulJob.self, for: .messageSend) + } + } + } + } +}