From b2ab984586a4ca5e2d6f0def86343bf4cafbc91d Mon Sep 17 00:00:00 2001 From: ryanzhao Date: Tue, 22 Feb 2022 14:01:48 +1100 Subject: [PATCH] refactor on timer and polling threading --- .../Pollers/ClosedGroupPoller.swift | 24 +++++-------- .../Pollers/OpenGroupPollerV2.swift | 35 ++++++++----------- .../Sending & Receiving/Pollers/Poller.swift | 27 +++++++------- .../PromiseKit/Promise+Delaying.swift | 11 +++--- 4 files changed, 44 insertions(+), 53 deletions(-) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index 459b6c56b..a65220a3a 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -61,13 +61,11 @@ public final class ClosedGroupPoller : NSObject { // MARK: Private API private func setUpPolling(for groupPublicKey: String) { - poll(groupPublicKey).done2 { [weak self] _ in - DispatchQueue.main.async { // Timers don't do well on background queues + Threading.closedGroupPollerQueue.async { + self.poll(groupPublicKey).done(on: Threading.closedGroupPollerQueue) { [weak self] _ in self?.pollRecursively(groupPublicKey) - } - }.catch2 { [weak self] error in - // The error is logged in poll(_:) - DispatchQueue.main.async { // Timers don't do well on background queues + }.catch(on: Threading.closedGroupPollerQueue) { [weak self] error in + // The error is logged in poll(_:) self?.pollRecursively(groupPublicKey) } } @@ -87,18 +85,14 @@ public final class ClosedGroupPoller : NSObject { let a = (ClosedGroupPoller.maxPollInterval - minPollInterval) / limit let nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval SNLog("Next poll interval for closed group with public key: \(groupPublicKey) is \(nextPollInterval) s.") - timers[groupPublicKey] = Timer.scheduledTimer(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in + timers[groupPublicKey] = Timer.scheduledTimerOnMainThread(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in timer.invalidate() Threading.closedGroupPollerQueue.async { - self?.poll(groupPublicKey).done2 { _ in - DispatchQueue.main.async { // Timers don't do well on background queues - self?.pollRecursively(groupPublicKey) - } - }.catch2 { error in + self?.poll(groupPublicKey).done(on: Threading.closedGroupPollerQueue) { _ in + self?.pollRecursively(groupPublicKey) + }.catch(on: Threading.closedGroupPollerQueue) { error in // The error is logged in poll(_:) - DispatchQueue.main.async { // Timers don't do well on background queues - self?.pollRecursively(groupPublicKey) - } + self?.pollRecursively(groupPublicKey) } } } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift index 9e50c2717..8ef1ffcca 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPollerV2.swift @@ -19,18 +19,11 @@ public final class OpenGroupPollerV2 : NSObject { @objc public func startIfNeeded() { guard !hasStarted else { return } - DispatchQueue.main.async { [weak self] in // Timers don't do well on background queues - guard let strongSelf = self else { return } - strongSelf.hasStarted = true - strongSelf.timer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollInterval, repeats: true) { _ in - Threading.openGroupPollerQueue.async { - self?.poll().retainUntilComplete() - } - } - Threading.openGroupPollerQueue.async { - strongSelf.poll().retainUntilComplete() - } + hasStarted = true + timer = Timer.scheduledTimerOnMainThread(withTimeInterval: pollInterval, repeats: true) { _ in + self.poll().retainUntilComplete() } + poll().retainUntilComplete() } @objc public func stop() { @@ -50,15 +43,17 @@ public final class OpenGroupPollerV2 : NSObject { self.isPolling = true let (promise, seal) = Promise.pending() promise.retainUntilComplete() - OpenGroupAPIV2.compactPoll(server).done(on: OpenGroupAPIV2.workQueue) { [weak self] bodies in - guard let self = self else { return } - self.isPolling = false - bodies.forEach { self.handleCompactPollBody($0, isBackgroundPoll: isBackgroundPoll) } - seal.fulfill(()) - }.catch(on: OpenGroupAPIV2.workQueue) { error in - SNLog("Open group polling failed due to error: \(error).") - self.isPolling = false - seal.fulfill(()) // The promise is just used to keep track of when we're done + Threading.openGroupPollerQueue.async { + OpenGroupAPIV2.compactPoll(self.server).done(on: OpenGroupAPIV2.workQueue) { [weak self] bodies in + guard let self = self else { return } + self.isPolling = false + bodies.forEach { self.handleCompactPollBody($0, isBackgroundPoll: isBackgroundPoll) } + seal.fulfill(()) + }.catch(on: OpenGroupAPIV2.workQueue) { error in + SNLog("Open group polling failed due to error: \(error).") + self.isPolling = false + seal.fulfill(()) // The promise is just used to keep track of when we're done + } } return promise } diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index cecc33f9b..8885e5542 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -45,21 +45,22 @@ public final class Poller : NSObject { // MARK: Private API private func setUpPolling() { guard isPolling else { return } - let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then2 { [weak self] _ -> Promise in - guard let strongSelf = self else { return Promise { $0.fulfill(()) } } - strongSelf.usedSnodes.removeAll() - let (promise, seal) = Promise.pending() - strongSelf.pollNextSnode(seal: seal) - return promise - }.ensure(on: DispatchQueue.main) { [weak self] in // Timers don't do well on background queues - guard let strongSelf = self, strongSelf.isPolling else { return } - Timer.scheduledTimer(withTimeInterval: Poller.retryInterval, repeats: false) { _ in - guard let strongSelf = self else { return } - Threading.pollerQueue.async { + Threading.pollerQueue.async { + let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then(on: Threading.pollerQueue) { [weak self] _ -> Promise in + guard let strongSelf = self else { return Promise { $0.fulfill(()) } } + strongSelf.usedSnodes.removeAll() + let (promise, seal) = Promise.pending() + strongSelf.pollNextSnode(seal: seal) + return promise + }.ensure(on: Threading.pollerQueue) { [weak self] in // Timers don't do well on background queues + guard let strongSelf = self, strongSelf.isPolling else { return } + Timer.scheduledTimerOnMainThread(withTimeInterval: Poller.retryInterval, repeats: false) { _ in + guard let strongSelf = self else { return } strongSelf.setUpPolling() } } } + } private func pollNextSnode(seal: Resolver) { @@ -70,9 +71,9 @@ public final class Poller : NSObject { // randomElement() uses the system's default random generator, which is cryptographically secure let nextSnode = unusedSnodes.randomElement()! usedSnodes.insert(nextSnode) - poll(nextSnode, seal: seal).done2 { + poll(nextSnode, seal: seal).done(on: Threading.pollerQueue) { seal.fulfill(()) - }.catch2 { [weak self] error in + }.catch(on: Threading.pollerQueue) { [weak self] error in if let error = error as? Error, error == .pollLimitReached { self?.pollCount = 0 } else { diff --git a/SessionUtilitiesKit/PromiseKit/Promise+Delaying.swift b/SessionUtilitiesKit/PromiseKit/Promise+Delaying.swift index 7e02f4e88..02401a14e 100644 --- a/SessionUtilitiesKit/PromiseKit/Promise+Delaying.swift +++ b/SessionUtilitiesKit/PromiseKit/Promise+Delaying.swift @@ -2,12 +2,13 @@ import PromiseKit /// Delay the execution of the promise constructed in `body` by `delay` seconds. public func withDelay(_ delay: TimeInterval, completionQueue: DispatchQueue, body: @escaping () -> Promise) -> Promise { - #if DEBUG - assert(Thread.current.isMainThread) // Timers don't do well on background queues - #endif let (promise, seal) = Promise.pending() - Timer.scheduledTimer(withTimeInterval: delay, repeats: false) { _ in - body().done(on: completionQueue) { seal.fulfill($0) }.catch(on: completionQueue) { seal.reject($0) } + Timer.scheduledTimerOnMainThread(withTimeInterval: delay, repeats: false) { _ in + body().done(on: completionQueue) { + seal.fulfill($0) + }.catch(on: completionQueue) { + seal.reject($0) + } } return promise }