refactor to use Atomic wrapper

This commit is contained in:
Ryan Zhao 2022-04-04 14:40:24 +10:00
parent a22dc15249
commit 43ca54c0a0
8 changed files with 17 additions and 20 deletions

View file

@ -61,7 +61,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
// MARK: Running // MARK: Running
public func execute() { public func execute() {
if let id = id { if let id = id {
JobQueue.currentlyExecutingJobs.insert(id) JobQueue.currentlyExecutingJobs.mutate{ $0.insert(id) }
} }
guard !isDeferred else { return } guard !isDeferred else { return }
if TSAttachment.fetch(uniqueId: attachmentID) is TSAttachmentStream { if TSAttachment.fetch(uniqueId: attachmentID) is TSAttachmentStream {

View file

@ -61,7 +61,7 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
// MARK: Running // MARK: Running
public func execute() { public func execute() {
if let id = id { if let id = id {
JobQueue.currentlyExecutingJobs.insert(id) JobQueue.currentlyExecutingJobs.mutate{ $0.insert(id) }
} }
guard let stream = TSAttachment.fetch(uniqueId: attachmentID) as? TSAttachmentStream else { guard let stream = TSAttachment.fetch(uniqueId: attachmentID) as? TSAttachmentStream else {
return handleFailure(error: Error.noAttachment) return handleFailure(error: Error.noAttachment)

View file

@ -5,9 +5,7 @@ public final class JobQueue : NSObject, JobDelegate {
private static var jobIDs: [UInt64:UInt64] = [:] private static var jobIDs: [UInt64:UInt64] = [:]
internal static var currentlyExecutingJobs: Set<String> = [] internal static var currentlyExecutingJobs: Atomic<Set<String>> = Atomic([])
private let internalQueue: DispatchQueue = DispatchQueue(label:"executingJobQueue")
@objc public static let shared = JobQueue() @objc public static let shared = JobQueue()
@ -38,7 +36,7 @@ public final class JobQueue : NSObject, JobDelegate {
allJobTypes.forEach { type in allJobTypes.forEach { type in
let allPendingJobs = SNMessagingKitConfiguration.shared.storage.getAllPendingJobs(of: type) let allPendingJobs = SNMessagingKitConfiguration.shared.storage.getAllPendingJobs(of: type)
allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { job in // Retry the oldest jobs first allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { job in // Retry the oldest jobs first
guard !JobQueue.currentlyExecutingJobs.contains(job.id!) else { guard !JobQueue.currentlyExecutingJobs.wrappedValue.contains(job.id!) else {
return SNLog("Not resuming already executing job.") return SNLog("Not resuming already executing job.")
} }
SNLog("Resuming pending job of type: \(type).") SNLog("Resuming pending job of type: \(type).")
@ -95,7 +93,7 @@ public final class JobQueue : NSObject, JobDelegate {
} }
private func removeExecutingJob(_ jobID: String) { private func removeExecutingJob(_ jobID: String) {
let _ = internalQueue.sync { JobQueue.currentlyExecutingJobs.remove(jobID) } JobQueue.currentlyExecutingJobs.mutate { $0.remove(jobID) }
} }
private func getRetryInterval(for job: Job) -> TimeInterval { private func getRetryInterval(for job: Job) -> TimeInterval {

View file

@ -59,7 +59,7 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
public func execute() -> Promise<Void> { public func execute() -> Promise<Void> {
if let id = id { // Can be nil (e.g. when background polling) if let id = id { // Can be nil (e.g. when background polling)
JobQueue.currentlyExecutingJobs.insert(id) JobQueue.currentlyExecutingJobs.mutate { $0.insert(id) }
} }
let (promise, seal) = Promise<Void>.pending() let (promise, seal) = Promise<Void>.pending()
SNMessagingKitConfiguration.shared.storage.write(with: { transaction in // Intentionally capture self SNMessagingKitConfiguration.shared.storage.write(with: { transaction in // Intentionally capture self

View file

@ -71,7 +71,7 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
// MARK: Running // MARK: Running
public func execute() { public func execute() {
if let id = id { if let id = id {
JobQueue.currentlyExecutingJobs.insert(id) JobQueue.currentlyExecutingJobs.mutate{ $0.insert(id) }
} }
let storage = SNMessagingKitConfiguration.shared.storage let storage = SNMessagingKitConfiguration.shared.storage
if let message = message as? VisibleMessage { if let message = message as? VisibleMessage {

View file

@ -39,7 +39,7 @@ public final class NotifyPNServerJob : NSObject, Job, NSCoding { // NSObject/NSC
public func execute() -> Promise<Void> { public func execute() -> Promise<Void> {
if let id = id { if let id = id {
JobQueue.currentlyExecutingJobs.insert(id) JobQueue.currentlyExecutingJobs.mutate{ $0.insert(id) }
} }
let server = PushNotificationAPI.server let server = PushNotificationAPI.server
let parameters = [ "data" : message.data.description, "send_to" : message.recipient ] let parameters = [ "data" : message.data.description, "send_to" : message.recipient ]

View file

@ -3,7 +3,7 @@ import SessionSnodeKit
@objc(SNOpenGroupAPIV2) @objc(SNOpenGroupAPIV2)
public final class OpenGroupAPIV2 : NSObject { public final class OpenGroupAPIV2 : NSObject {
private static var authTokenPromises: [String:Promise<String>] = [:] private static var authTokenPromises: Atomic<[String:Promise<String>]> = Atomic([:])
private static var hasPerformedInitialPoll: [String:Bool] = [:] private static var hasPerformedInitialPoll: [String:Bool] = [:]
private static var hasUpdatedLastOpenDate = false private static var hasUpdatedLastOpenDate = false
public static let workQueue = DispatchQueue(label: "OpenGroupAPIV2.workQueue", qos: .userInitiated) // It's important that this is a serial queue public static let workQueue = DispatchQueue(label: "OpenGroupAPIV2.workQueue", qos: .userInitiated) // It's important that this is a serial queue
@ -210,7 +210,7 @@ public final class OpenGroupAPIV2 : NSObject {
if let authToken = storage.getAuthToken(for: room, on: server) { if let authToken = storage.getAuthToken(for: room, on: server) {
return Promise.value(authToken) return Promise.value(authToken)
} else { } else {
if let authTokenPromise = authTokenPromises["\(server).\(room)"] { if let authTokenPromise = authTokenPromises.wrappedValue["\(server).\(room)"] {
return authTokenPromise return authTokenPromise
} else { } else {
let promise = requestNewAuthToken(for: room, on: server) let promise = requestNewAuthToken(for: room, on: server)
@ -225,11 +225,11 @@ public final class OpenGroupAPIV2 : NSObject {
return promise return promise
} }
promise.done(on: OpenGroupAPIV2.workQueue) { _ in promise.done(on: OpenGroupAPIV2.workQueue) { _ in
authTokenPromises["\(server).\(room)"] = nil authTokenPromises.mutate{ $0["\(server).\(room)"] = nil }
}.catch(on: OpenGroupAPIV2.workQueue) { _ in }.catch(on: OpenGroupAPIV2.workQueue) { _ in
authTokenPromises["\(server).\(room)"] = nil authTokenPromises.mutate{ $0["\(server).\(room)"] = nil }
} }
authTokenPromises["\(server).\(room)"] = promise authTokenPromises.mutate{ $0["\(server).\(room)"] = promise }
return promise return promise
} }
} }

View file

@ -3,9 +3,8 @@ import PromiseKit
@objc(LKClosedGroupPoller) @objc(LKClosedGroupPoller)
public final class ClosedGroupPoller : NSObject { public final class ClosedGroupPoller : NSObject {
private var isPolling: [String:Bool] = [:] private var isPolling: Atomic<[String:Bool]> = Atomic([:])
private var timers: [String:Timer] = [:] private var timers: [String:Timer] = [:]
private let internalQueue: DispatchQueue = DispatchQueue(label:"isPollingQueue")
// MARK: Settings // MARK: Settings
private static let minPollInterval: Double = 2 private static let minPollInterval: Double = 2
@ -44,7 +43,7 @@ public final class ClosedGroupPoller : NSObject {
// Might be a race condition that the setUpPolling finishes too soon, // Might be a race condition that the setUpPolling finishes too soon,
// and the timer is not created, if we mark the group as is polling // and the timer is not created, if we mark the group as is polling
// after setUpPolling. So the poller may not work, thus misses messages. // after setUpPolling. So the poller may not work, thus misses messages.
internalQueue.sync{ isPolling[groupPublicKey] = true } isPolling.mutate{ $0[groupPublicKey] = true }
setUpPolling(for: groupPublicKey) setUpPolling(for: groupPublicKey)
} }
@ -55,7 +54,7 @@ public final class ClosedGroupPoller : NSObject {
} }
public func stopPolling(for groupPublicKey: String) { public func stopPolling(for groupPublicKey: String) {
internalQueue.sync{ isPolling[groupPublicKey] = false } isPolling.mutate{ $0[groupPublicKey] = false }
timers[groupPublicKey]?.invalidate() timers[groupPublicKey]?.invalidate()
} }
@ -139,6 +138,6 @@ public final class ClosedGroupPoller : NSObject {
// MARK: Convenience // MARK: Convenience
private func isPolling(for groupPublicKey: String) -> Bool { private func isPolling(for groupPublicKey: String) -> Bool {
return internalQueue.sync{ isPolling[groupPublicKey] ?? false } return isPolling.wrappedValue[groupPublicKey] ?? false
} }
} }