From a5329763333ffdea596fd341439b45fb2666c0f5 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Tue, 23 May 2023 15:02:37 +1000 Subject: [PATCH] Updated the JobRunner to wait until the current transaction is done before running a job --- SessionUtilitiesKit/JobRunner/JobRunner.swift | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 7a56a24be..7906fd532 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -138,7 +138,7 @@ public final class JobRunner { return } - queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } + queues.wrappedValue[updatedJob.variant]?.add(db, job: updatedJob, canStartJob: canStartJob) // Don't start the queue if the job can't be started guard canStartJob else { return } @@ -161,7 +161,7 @@ public final class JobRunner { return } - queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) + queues.wrappedValue[job.variant]?.upsert(db, job: job, canStartJob: canStartJob) // Don't start the queue if the job can't be started guard canStartJob else { return } @@ -524,7 +524,7 @@ private final class JobQueue { // MARK: - Execution - fileprivate func add(_ job: Job, canStartJob: Bool = true) { + fileprivate func add(_ db: Database, job: Job, canStartJob: Bool = true) { // Check if the job should be added to the queue guard canStartJob, @@ -541,7 +541,11 @@ private final class JobQueue { // If this is a concurrent queue then we should immediately start the next job guard executionType == .concurrent else { return } - runNextJob() + // Ensure that the database commit has completed and then trigger the next job to run (need + // to ensure any interactions have been correctly inserted first) + db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in + self?.runNextJob() + } } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start @@ -549,7 +553,7 @@ private final class JobQueue { /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started - fileprivate func upsert(_ job: Job, canStartJob: Bool = true) { + fileprivate func upsert(_ db: Database, job: Job, canStartJob: Bool = true) { guard let jobId: Int64 = job.id else { SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") return @@ -572,7 +576,7 @@ private final class JobQueue { // If we didn't update an existing job then we need to add it to the queue guard !didUpdateExistingJob else { return } - add(job, canStartJob: canStartJob) + add(db, job: job, canStartJob: canStartJob) } fileprivate func insert(_ job: Job, before otherJob: Job) {