diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 03e300e67..a266e5b83 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -2016,7 +2016,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)) + kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread @@ -2270,7 +2271,8 @@ extension ConversationVC: try MessageSender.send( db, message: DataExtractionNotification( - kind: .screenshot + kind: .screenshot, + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, in: thread diff --git a/Session/Media Viewing & Editing/MediaPageViewController.swift b/Session/Media Viewing & Editing/MediaPageViewController.swift index 49d9374e5..0ffc3cd42 100644 --- a/Session/Media Viewing & Editing/MediaPageViewController.swift +++ b/Session/Media Viewing & Editing/MediaPageViewController.swift @@ -540,7 +540,8 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou message: DataExtractionNotification( kind: .mediaSaved( timestamp: UInt64(currentViewController.galleryItem.interactionTimestampMs) - ) + ), + sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs()) ), interactionId: nil, // Show no interaction for the current user in: thread diff --git a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift index d54549df1..627d58ea8 100644 --- a/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift +++ b/SessionMessagingKit/Messages/Control Messages/DataExtractionNotification.swift @@ -27,8 +27,13 @@ public final class DataExtractionNotification: ControlMessage { // MARK: - Initialization - public init(kind: Kind) { - super.init() + public init( + kind: Kind, + sentTimestamp: UInt64? = nil + ) { + super.init( + sentTimestamp: sentTimestamp + ) self.kind = kind } diff --git a/SessionUtilitiesKit/Database/Models/Job.swift b/SessionUtilitiesKit/Database/Models/Job.swift index eb494d3e1..9cdb2e2a7 100644 --- a/SessionUtilitiesKit/Database/Models/Job.swift +++ b/SessionUtilitiesKit/Database/Models/Job.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { +public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "job" } internal static let dependencyForeignKey = ForeignKey([Columns.id], to: [JobDependencies.Columns.dependantId]) public static let dependantJobDependency = hasMany( diff --git a/SessionUtilitiesKit/Database/Models/JobDependencies.swift b/SessionUtilitiesKit/Database/Models/JobDependencies.swift index 9cda7ceb1..16201367b 100644 --- a/SessionUtilitiesKit/Database/Models/JobDependencies.swift +++ b/SessionUtilitiesKit/Database/Models/JobDependencies.swift @@ -3,7 +3,7 @@ import Foundation import GRDB -public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { +public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { public static var databaseTableName: String { "jobDependencies" } internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) diff --git a/SessionUtilitiesKit/General/Array+Utilities.swift b/SessionUtilitiesKit/General/Array+Utilities.swift index cf350e86d..77445b64b 100644 --- a/SessionUtilitiesKit/General/Array+Utilities.swift +++ b/SessionUtilitiesKit/General/Array+Utilities.swift @@ -45,6 +45,14 @@ public extension Array { return updatedArray } + func inserting(contentsOf other: [Element]?, at index: Int) -> [Element] { + guard let other: [Element] = other else { return self } + + var updatedArray: [Element] = self + updatedArray.insert(contentsOf: other, at: 0) + return updatedArray + } + func grouped(by keyForValue: (Element) throws -> Key) -> [Key: [Element]] { return ((try? Dictionary(grouping: self, by: keyForValue)) ?? [:]) } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 408904cc5..e1f1408a8 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -627,9 +627,13 @@ private final class JobQueue { } fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { + guard let detailsData: Data = detailsData else { return false } + let pendingJobs: [Job] = queue.wrappedValue - return pendingJobs.contains { job in job.details == detailsData } + guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true } + + return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData) } fileprivate func removePendingJob(_ jobId: Int64) { @@ -760,13 +764,15 @@ private final class JobQueue { } // Check if the next job has any dependencies - let dependencyInfo: (expectedCount: Int, jobs: [Job]) = Storage.shared.read { db in - let numExpectedDependencies: Int = try JobDependencies + let dependencyInfo: (expectedCount: Int, jobs: Set) = Storage.shared.read { db in + let expectedDependencies: Set = try JobDependencies .filter(JobDependencies.Columns.jobId == nextJob.id) - .fetchCount(db) - let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db) + .fetchSet(db) + let jobDependencies: Set = try Job + .filter(ids: expectedDependencies.compactMap { $0.dependantId }) + .fetchSet(db) - return (numExpectedDependencies, jobDependencies) + return (expectedDependencies.count, jobDependencies) } .defaulting(to: (0, [])) @@ -778,39 +784,15 @@ private final class JobQueue { guard dependencyInfo.jobs.isEmpty else { SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first") - let jobDependencyIds: [Int64] = dependencyInfo.jobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = jobDependencyIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependencies which aren't in the queue we should just append them - guard !jobIdsNotInQueue.isEmpty else { - queue.mutate { queue in - queue.append( - contentsOf: dependencyInfo.jobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - queue.append(nextJob) - } - handleJobDeferred(nextJob) - return + /// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue + /// + /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies + /// are successfully completed + queue.mutate { queue in + queue = queue + .filter { !dependencyInfo.jobs.contains($0) } + .inserting(contentsOf: Array(dependencyInfo.jobs), at: 0) } - - // Otherwise re-add the current job after it's dependencies (if this isn't a concurrent - // queue - don't want to immediately try to start the job again only for it to end up back - // in here) - if executionType != .concurrent { - queue.mutate { queue in - guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else { - queue.append(nextJob) - return - } - - queue.insert(nextJob, at: lastDependencyIndex + 1) - } - } - handleJobDeferred(nextJob) return } @@ -910,6 +892,12 @@ private final class JobQueue { /// This function is called when a job succeeds private func handleJobSucceeded(_ job: Job, shouldStop: Bool) { + /// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is + /// removed so we need to retrieve these records before that happens) + let dependantJobs: [Job] = Storage.shared + .read { db in try job.dependantJobs.fetchAll(db) } + .defaulting(to: []) + switch job.behaviour { case .runOnce, .runOnceNextLaunch: Storage.shared.write { db in @@ -972,26 +960,17 @@ private final class JobQueue { default: break } - // For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other - // dependencies they will be removed again when they try to execute) - if executionType == .concurrent { - let dependantJobs: [Job] = Storage.shared - .read { db in try job.dependantJobs.fetchAll(db) } - .defaulting(to: []) - let dependantJobIds: [Int64] = dependantJobs - .compactMap { $0.id } - let jobIdsNotInQueue: Set = dependantJobIds - .asSet() - .subtracting(queue.wrappedValue.compactMap { $0.id }) - - // If there are dependant jobs which aren't in the queue we should just append them - if !jobIdsNotInQueue.isEmpty { - queue.mutate { queue in - queue.append( - contentsOf: dependantJobs - .filter { jobIdsNotInQueue.contains($0.id ?? -1) } - ) - } + /// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the + /// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other + /// unrelated jobs) + /// + /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be + /// removed from the queue, replaced by their dependencies + if !dependantJobs.isEmpty { + queue.mutate { queue in + queue = queue + .filter { !dependantJobs.contains($0) } + .inserting(contentsOf: dependantJobs, at: 0) } } @@ -1051,19 +1030,30 @@ private final class JobQueue { let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job)) Storage.shared.write { db in + /// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try + /// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely) + let dependantJobIds: [Int64] = try job.dependantJobs + .select(.id) + .asRequest(of: Int64.self) + .fetchAll(db) + + if !dependantJobIds.isEmpty { + queue.mutate { queue in + queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } + } + } + + /// Delete/update the failed jobs and any dependencies + let updatedFailureCount: UInt = (job.failureCount + 1) + guard !permanentFailure && ( maxFailureCount < 0 || - job.failureCount + 1 < maxFailureCount + updatedFailureCount <= maxFailureCount ) else { SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")") - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - // If the job permanently failed or we have performed all of our retry attempts // then delete the job and all of it's dependant jobs (it'll probably never succeed) _ = try job.dependantJobs @@ -1071,13 +1061,6 @@ private final class JobQueue { _ = try job.delete(db) - // Remove the dependant jobs from the queue (so we don't try to run a deleted job) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } - performCleanUp(for: job, result: .failed) return } @@ -1086,7 +1069,7 @@ private final class JobQueue { _ = try job .with( - failureCount: (job.failureCount + 1), + failureCount: updatedFailureCount, nextRunTimestamp: nextRunTimestamp ) .saved(db) @@ -1097,22 +1080,9 @@ private final class JobQueue { try job.dependantJobs .updateAll( db, - Job.Columns.failureCount.set(to: (job.failureCount + 1)), + Job.Columns.failureCount.set(to: updatedFailureCount), Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000))) ) - - let dependantJobIds: [Int64] = try job.dependantJobs - .select(.id) - .asRequest(of: Int64.self) - .fetchAll(db) - - // Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying - // to run dependecies indefinitely) - if !dependantJobIds.isEmpty { - queue.mutate { queue in - queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) } - } - } } performCleanUp(for: job, result: .failed)