From 437ec27d72e8210426aca86151fb76ee9cf72f9a Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 5 Apr 2023 17:19:21 +1000 Subject: [PATCH] JobRunner fixes Updated the DataExtractionNotification to take a 'sentTimestamp' when created to reduce the chance for duplicates being sent Fixed an issue where the 'hasPendingOrRunningJob' check didn't actually include running jobs Fixed some odd behaviours with job dependencies Fixed an incorrect failure count check --- .../ConversationVC+Interaction.swift | 6 +- .../MediaPageViewController.swift | 3 +- .../DataExtractionNotification.swift | 9 +- SessionUtilitiesKit/Database/Models/Job.swift | 2 +- .../Database/Models/JobDependencies.swift | 2 +- .../General/Array+Utilities.swift | 8 + SessionUtilitiesKit/JobRunner/JobRunner.swift | 142 +++++++----------- 7 files changed, 79 insertions(+), 93 deletions(-) diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 03e300e67..a266e5b83 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -2016,7 +2016,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)) + kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread @@ -2270,7 +2271,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .screenshot + kind: .screenshot, + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread diff --git a/Session/Media Viewing & Editing/MediaPageViewController.swift b/Session/Media Viewing & Editing/MediaPageViewController.swift index 49d9374e5..0ffc3cd42 100644 --- a/Session/Media Viewing & Editing/MediaPageViewController.swift +++ b/Session/Media Viewing & Editing/MediaPageViewController.swift @@ -540,7 +540,8 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou message: DataExtractionNotification( kind: .mediaSaved( timestamp: UInt64(currentViewController.galleryItem.interactionTimestampMs) - ) + ), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, // Show no interaction for the current user in: thread diff --git a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift index d54549df1..627d58ea8 100644 --- a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift +++ b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift @@ -27,8 +27,13 @@ public final class DataExtractionNotification: ControlMessage { // MARK: - Initialization - public init(kind: Kind) { - super.init() + public init( + kind: Kind, + sentTimestamp: UInt64? = nil + ) { + super.init( + sentTimestamp: sentTimestamp + ) self.kind = kind } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index eb494d3e1..9cdb2e2a7 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { +public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "job" } internal static let dependencyForeignKey = ForeignKey([Columns.id], to: [JobDependencies.Columns.dependantId]) public static let dependantJobDependency = hasMany( diff --git a/SessionUtilitiesKit/Database/Models/JobDependencies.swift b/SessionUtilitiesKit/Database/Models/JobDependencies.swift index 9cda7ceb1..16201367b 100644 --- a/SessionUtilitiesKit/Database/Models/JobDependencies.swift +++ b/SessionUtilitiesKit/Database/Models/JobDependencies.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { +public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "jobDependencies" } internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) diff --git a/SessionUtilitiesKit/General/Array+Utilities.swift b/SessionUtilitiesKit/General/Array+Utilities.swift index cf350e86d..77445b64b 100644 --- a/SessionUtilitiesKit/General/Array+Utilities.swift +++ b/SessionUtilitiesKit/General/Array+Utilities.swift @@ -45,6 +45,14 @@ public extension Array { return updatedArray } + func inserting(contentsOf other: [Element]?, at index: Int) -> [Element] { + guard let other: [Element] = other else { return self } + + var updatedArray: [Element] = self + updatedArray.insert(contentsOf: other, at: 0) + return updatedArray + } + func grouped(by keyForValue: (Element) throws -> Key) -> [Key: [Element]] { return ((try? Dictionary(grouping: self, by: keyForValue)) ?? [:]) } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 408904cc5..e1f1408a8 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -627,9 +627,13 @@ private final class JobQueue { } fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { + guard let detailsData: Data = detailsData else { return false } + let pendingJobs: [Job] = queue.wrappedValue - return pendingJobs.contains { job in job.details == detailsData } + guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true } + + return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData) } fileprivate func removePendingJob(_ jobId: Int64) { @@ -760,13 +764,15 @@ private final class JobQueue { } // Check if the next job has any dependencies - let dependencyInfo: (expectedCount: Int, jobs: [Job]) = Storage.shared.read { db in - let numExpectedDependencies: Int = try JobDependencies + let dependencyInfo: (expectedCount: Int, jobs: Set) = Storage.shared.read { db in + let expectedDependencies: Set = try JobDependencies .filter(JobDependencies.Columns.jobId == nextJob.id) - .fetchCount(db) - let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db) + .fetchSet(db) + let jobDependencies: Set = try Job + .filter(ids: expectedDependencies.compactMap { $0.dependantId }) + .fetchSet(db) - return (numExpectedDependencies, jobDependencies) + return (expectedDependencies.count, jobDependencies) } .defaulting(to: (0, [])) @@ -778,39 +784,15 @@ private final class JobQueue { guard dependencyInfo.jobs.isEmpty else { SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") - let jobDependencyIds: [Int64] = dependencyInfo.jobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = jobDependencyIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependencies which aren't in the queue we should just append them - guard !jobIdsNotInQueue.isEmpty else { - queue.mutate { queue in - queue.append( - contentsOf: dependencyInfo.jobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - queue.append(nextJob) - } - handleJobDeferred(nextJob) - return + /// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue + /// + /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies + /// are successfully completed + queue.mutate { queue in + queue = queue + .filter { !dependencyInfo.jobs.contains($0) } + .inserting(contentsOf: Array(dependencyInfo.jobs), at: 0) } - - // Otherwise re-add the current job after it's dependencies (if this isn't a concurrent - // queue - don't want to immediately try to start the job again only for it to end up back - // in here) - if executionType != .concurrent { - queue.mutate { queue in - guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else { - queue.append(nextJob) - return - } - - queue.insert(nextJob, at: lastDependencyIndex + 1) - } - } - handleJobDeferred(nextJob) return } @@ -910,6 +892,12 @@ private final class JobQueue { /// This function is called when a job succeeds private func handleJobSucceeded(_ job: Job, shouldStop: Bool) { + /// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is + /// removed so we need to retrieve these records before that happens) + let dependantJobs: [Job] = Storage.shared + .read { db in try job.dependantJobs.fetchAll(db) } + .defaulting(to: []) + switch job.behaviour { case .runOnce, .runOnceNextLaunch: Storage.shared.write { db in @@ -972,26 +960,17 @@ private final class JobQueue { default: break } - // For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other - // dependencies they will be removed again when they try to execute) - if executionType == .concurrent { - let dependantJobs: [Job] = Storage.shared - .read { db in try job.dependantJobs.fetchAll(db) } - .defaulting(to: []) - let dependantJobIds: [Int64] = dependantJobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = dependantJobIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependant jobs which aren't in the queue we should just append them - if !jobIdsNotInQueue.isEmpty { - queue.mutate { queue in - queue.append( - contentsOf: dependantJobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - } + /// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the + /// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other + /// unrelated jobs) + /// + /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be + /// removed from the queue, replaced by their dependencies + if !dependantJobs.isEmpty { + queue.mutate { queue in + queue = queue + .filter { !dependantJobs.contains($0) } + .inserting(contentsOf: dependantJobs, at: 0) } } @@ -1051,19 +1030,30 @@ private final class JobQueue { let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) Storage.shared.write { db in + /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try + /// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely) + let dependantJobIds: [Int64] = try job.dependantJobs + .select(.id) + .asRequest(of: Int64.self) + .fetchAll(db) + + if !dependantJobIds.isEmpty { + queue.mutate { queue in + queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } + } + } + + /// Delete/update the failed jobs and any dependencies + let updatedFailureCount: UInt = (job.failureCount + 1) + guard !permanentFailure && ( maxFailureCount < 0 || - job.failureCount + 1 < maxFailureCount + updatedFailureCount <= maxFailureCount ) else { SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")") - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - // If the job permanently failed or we have performed all of our retry attempts // then delete the job and all of it's dependant jobs (it'll probably never succeed) _ = try job.dependantJobs @@ -1071,13 +1061,6 @@ private final class JobQueue { _ = try job.delete(db) - // Remove the dependant jobs from the queue (so we don't try to run a deleted job) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } - performCleanUp(for: job, result: .failed) return } @@ -1086,7 +1069,7 @@ private final class JobQueue { _ = try job .with( - failureCount: (job.failureCount + 1), + failureCount: updatedFailureCount, nextRunTimestamp: nextRunTimestamp ) .saved(db) @@ -1097,22 +1080,9 @@ private final class JobQueue { try job.dependantJobs .updateAll( db, - Job.Columns.failureCount.set(to: (job.failureCount + 1)), + Job.Columns.failureCount.set(to: updatedFailureCount), Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000))) ) - - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - - // Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying - // to run dependecies indefinitely) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } } performCleanUp(for: job, result: .failed)