JobRunner fixes

Updated the DataExtractionNotification to take a 'sentTimestamp' when created to reduce the chance for duplicates being sent
Fixed an issue where the 'hasPendingOrRunningJob' check didn't actually include running jobs
Fixed some odd behaviours with job dependencies
Fixed an incorrect failure count check
This commit is contained in:
Morgan Pretty 2023-04-05 17:19:21 +10:00
parent 2568d50835
commit 437ec27d72
7 changed files with 79 additions and 93 deletions

View File

@ -2016,7 +2016,8 @@ extension ConversationVC:
try MessageSender.send(
db,
message: DataExtractionNotification(
kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs))
kind: .mediaSaved(timestamp: UInt64(cellViewModel.timestampMs)),
sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs())
),
interactionId: nil,
in: thread
@ -2270,7 +2271,8 @@ extension ConversationVC:
try MessageSender.send(
db,
message: DataExtractionNotification(
kind: .screenshot
kind: .screenshot,
sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs())
),
interactionId: nil,
in: thread

View File

@ -540,7 +540,8 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou
message: DataExtractionNotification(
kind: .mediaSaved(
timestamp: UInt64(currentViewController.galleryItem.interactionTimestampMs)
)
),
sentTimestamp: UInt64(SnodeAPI.currentOffsetTimestampMs())
),
interactionId: nil, // Show no interaction for the current user
in: thread

View File

@ -27,8 +27,13 @@ public final class DataExtractionNotification: ControlMessage {
// MARK: - Initialization
public init(kind: Kind) {
super.init()
public init(
kind: Kind,
sentTimestamp: UInt64? = nil
) {
super.init(
sentTimestamp: sentTimestamp
)
self.kind = kind
}

View File

@ -3,7 +3,7 @@
import Foundation
import GRDB
public struct Job: Codable, Equatable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible {
public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord, MutablePersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "job" }
internal static let dependencyForeignKey = ForeignKey([Columns.id], to: [JobDependencies.Columns.dependantId])
public static let dependantJobDependency = hasMany(

View File

@ -3,7 +3,7 @@
import Foundation
import GRDB
public struct JobDependencies: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible {
public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "jobDependencies" }
internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id])
internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id])

View File

@ -45,6 +45,14 @@ public extension Array {
return updatedArray
}
func inserting(contentsOf other: [Element]?, at index: Int) -> [Element] {
guard let other: [Element] = other else { return self }
var updatedArray: [Element] = self
updatedArray.insert(contentsOf: other, at: 0)
return updatedArray
}
func grouped<Key: Hashable>(by keyForValue: (Element) throws -> Key) -> [Key: [Element]] {
return ((try? Dictionary(grouping: self, by: keyForValue)) ?? [:])
}

View File

@ -627,9 +627,13 @@ private final class JobQueue {
}
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool {
guard let detailsData: Data = detailsData else { return false }
let pendingJobs: [Job] = queue.wrappedValue
return pendingJobs.contains { job in job.details == detailsData }
guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true }
return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData)
}
fileprivate func removePendingJob(_ jobId: Int64) {
@ -760,13 +764,15 @@ private final class JobQueue {
}
// Check if the next job has any dependencies
let dependencyInfo: (expectedCount: Int, jobs: [Job]) = Storage.shared.read { db in
let numExpectedDependencies: Int = try JobDependencies
let dependencyInfo: (expectedCount: Int, jobs: Set<Job>) = Storage.shared.read { db in
let expectedDependencies: Set<JobDependencies> = try JobDependencies
.filter(JobDependencies.Columns.jobId == nextJob.id)
.fetchCount(db)
let jobDependencies: [Job] = try nextJob.dependencies.fetchAll(db)
.fetchSet(db)
let jobDependencies: Set<Job> = try Job
.filter(ids: expectedDependencies.compactMap { $0.dependantId })
.fetchSet(db)
return (numExpectedDependencies, jobDependencies)
return (expectedDependencies.count, jobDependencies)
}
.defaulting(to: (0, []))
@ -778,39 +784,15 @@ private final class JobQueue {
guard dependencyInfo.jobs.isEmpty else {
SNLog("[JobRunner] \(queueContext) found job with \(dependencyInfo.jobs.count) dependencies, running those first")
let jobDependencyIds: [Int64] = dependencyInfo.jobs
.compactMap { $0.id }
let jobIdsNotInQueue: Set<Int64> = jobDependencyIds
.asSet()
.subtracting(queue.wrappedValue.compactMap { $0.id })
// If there are dependencies which aren't in the queue we should just append them
guard !jobIdsNotInQueue.isEmpty else {
queue.mutate { queue in
queue.append(
contentsOf: dependencyInfo.jobs
.filter { jobIdsNotInQueue.contains($0.id ?? -1) }
)
queue.append(nextJob)
}
handleJobDeferred(nextJob)
return
/// Remove all jobs this one is dependant on from the queue and re-insert them at the start of the queue
///
/// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies
/// are successfully completed
queue.mutate { queue in
queue = queue
.filter { !dependencyInfo.jobs.contains($0) }
.inserting(contentsOf: Array(dependencyInfo.jobs), at: 0)
}
// Otherwise re-add the current job after it's dependencies (if this isn't a concurrent
// queue - don't want to immediately try to start the job again only for it to end up back
// in here)
if executionType != .concurrent {
queue.mutate { queue in
guard let lastDependencyIndex: Int = queue.lastIndex(where: { jobDependencyIds.contains($0.id ?? -1) }) else {
queue.append(nextJob)
return
}
queue.insert(nextJob, at: lastDependencyIndex + 1)
}
}
handleJobDeferred(nextJob)
return
}
@ -910,6 +892,12 @@ private final class JobQueue {
/// This function is called when a job succeeds
private func handleJobSucceeded(_ job: Job, shouldStop: Bool) {
/// Retrieve the dependant jobs first (the `JobDependecies` table has cascading deletion when the original `Job` is
/// removed so we need to retrieve these records before that happens)
let dependantJobs: [Job] = Storage.shared
.read { db in try job.dependantJobs.fetchAll(db) }
.defaulting(to: [])
switch job.behaviour {
case .runOnce, .runOnceNextLaunch:
Storage.shared.write { db in
@ -972,26 +960,17 @@ private final class JobQueue {
default: break
}
// For concurrent queues retrieve any 'dependant' jobs and re-add them here (if they have other
// dependencies they will be removed again when they try to execute)
if executionType == .concurrent {
let dependantJobs: [Job] = Storage.shared
.read { db in try job.dependantJobs.fetchAll(db) }
.defaulting(to: [])
let dependantJobIds: [Int64] = dependantJobs
.compactMap { $0.id }
let jobIdsNotInQueue: Set<Int64> = dependantJobIds
.asSet()
.subtracting(queue.wrappedValue.compactMap { $0.id })
// If there are dependant jobs which aren't in the queue we should just append them
if !jobIdsNotInQueue.isEmpty {
queue.mutate { queue in
queue.append(
contentsOf: dependantJobs
.filter { jobIdsNotInQueue.contains($0.id ?? -1) }
)
}
/// Now that the job has been completed we want to insert any jobs that were dependant on it to the start of the queue (the
/// most likely case is that we want an entire job chain to be completed at the same time rather than being blocked by other
/// unrelated jobs)
///
/// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
/// removed from the queue, replaced by their dependencies
if !dependantJobs.isEmpty {
queue.mutate { queue in
queue = queue
.filter { !dependantJobs.contains($0) }
.inserting(contentsOf: dependantJobs, at: 0)
}
}
@ -1051,19 +1030,30 @@ private final class JobQueue {
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + JobRunner.getRetryInterval(for: job))
Storage.shared.write { db in
/// Remove any dependant jobs from the queue (shouldn't be in there but filter the queue just in case so we don't try
/// to run a deleted job or get stuck in a loop of trying to run dependencies indefinitely)
let dependantJobIds: [Int64] = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
/// Delete/update the failed jobs and any dependencies
let updatedFailureCount: UInt = (job.failureCount + 1)
guard
!permanentFailure && (
maxFailureCount < 0 ||
job.failureCount + 1 < maxFailureCount
updatedFailureCount <= maxFailureCount
)
else {
SNLog("[JobRunner] \(queueContext) \(job.variant) failed permanently\(maxFailureCount >= 0 ? "; too many retries" : "")")
let dependantJobIds: [Int64] = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
// If the job permanently failed or we have performed all of our retry attempts
// then delete the job and all of it's dependant jobs (it'll probably never succeed)
_ = try job.dependantJobs
@ -1071,13 +1061,6 @@ private final class JobQueue {
_ = try job.delete(db)
// Remove the dependant jobs from the queue (so we don't try to run a deleted job)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
performCleanUp(for: job, result: .failed)
return
}
@ -1086,7 +1069,7 @@ private final class JobQueue {
_ = try job
.with(
failureCount: (job.failureCount + 1),
failureCount: updatedFailureCount,
nextRunTimestamp: nextRunTimestamp
)
.saved(db)
@ -1097,22 +1080,9 @@ private final class JobQueue {
try job.dependantJobs
.updateAll(
db,
Job.Columns.failureCount.set(to: (job.failureCount + 1)),
Job.Columns.failureCount.set(to: updatedFailureCount),
Job.Columns.nextRunTimestamp.set(to: (nextRunTimestamp + (1 / 1000)))
)
let dependantJobIds: [Int64] = try job.dependantJobs
.select(.id)
.asRequest(of: Int64.self)
.fetchAll(db)
// Remove the dependant jobs from the queue (so we don't get stuck in a loop of trying
// to run dependecies indefinitely)
if !dependantJobIds.isEmpty {
queue.mutate { queue in
queue = queue.filter { !dependantJobIds.contains($0.id ?? -1) }
}
}
}
performCleanUp(for: job, result: .failed)