Updated the JobRunner to wait until the current transaction is done before running a job

This commit is contained in:
Morgan Pretty 2023-05-23 15:02:37 +10:00
parent 9c2ec47557
commit a532976333
1 changed files with 10 additions and 6 deletions

View File

@ -138,7 +138,7 @@ public final class JobRunner {
return
}
queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) }
queues.wrappedValue[updatedJob.variant]?.add(db, job: updatedJob, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
@ -161,7 +161,7 @@ public final class JobRunner {
return
}
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob)
queues.wrappedValue[job.variant]?.upsert(db, job: job, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
@ -524,7 +524,7 @@ private final class JobQueue {
// MARK: - Execution
fileprivate func add(_ job: Job, canStartJob: Bool = true) {
fileprivate func add(_ db: Database, job: Job, canStartJob: Bool = true) {
// Check if the job should be added to the queue
guard
canStartJob,
@ -541,7 +541,11 @@ private final class JobQueue {
// If this is a concurrent queue then we should immediately start the next job
guard executionType == .concurrent else { return }
runNextJob()
// Ensure that the database commit has completed and then trigger the next job to run (need
// to ensure any interactions have been correctly inserted first)
db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in
self?.runNextJob()
}
}
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
@ -549,7 +553,7 @@ private final class JobQueue {
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
fileprivate func upsert(_ job: Job, canStartJob: Bool = true) {
fileprivate func upsert(_ db: Database, job: Job, canStartJob: Bool = true) {
guard let jobId: Int64 = job.id else {
SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue")
return
@ -572,7 +576,7 @@ private final class JobQueue {
// If we didn't update an existing job then we need to add it to the queue
guard !didUpdateExistingJob else { return }
add(job, canStartJob: canStartJob)
add(db, job: job, canStartJob: canStartJob)
}
fileprivate func insert(_ job: Job, before otherJob: Job) {