refactor on timer and polling threading

This commit is contained in:
ryanzhao 2022-02-22 14:01:48 +11:00
parent eec3d31109
commit b2ab984586
4 changed files with 44 additions and 53 deletions

View File

@ -61,13 +61,11 @@ public final class ClosedGroupPoller : NSObject {
// MARK: Private API
private func setUpPolling(for groupPublicKey: String) {
poll(groupPublicKey).done2 { [weak self] _ in
DispatchQueue.main.async { // Timers don't do well on background queues
Threading.closedGroupPollerQueue.async {
self.poll(groupPublicKey).done(on: Threading.closedGroupPollerQueue) { [weak self] _ in
self?.pollRecursively(groupPublicKey)
}
}.catch2 { [weak self] error in
// The error is logged in poll(_:)
DispatchQueue.main.async { // Timers don't do well on background queues
}.catch(on: Threading.closedGroupPollerQueue) { [weak self] error in
// The error is logged in poll(_:)
self?.pollRecursively(groupPublicKey)
}
}
@ -87,18 +85,14 @@ public final class ClosedGroupPoller : NSObject {
let a = (ClosedGroupPoller.maxPollInterval - minPollInterval) / limit
let nextPollInterval = a * min(timeSinceLastMessage, limit) + minPollInterval
SNLog("Next poll interval for closed group with public key: \(groupPublicKey) is \(nextPollInterval) s.")
timers[groupPublicKey] = Timer.scheduledTimer(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in
timers[groupPublicKey] = Timer.scheduledTimerOnMainThread(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in
timer.invalidate()
Threading.closedGroupPollerQueue.async {
self?.poll(groupPublicKey).done2 { _ in
DispatchQueue.main.async { // Timers don't do well on background queues
self?.pollRecursively(groupPublicKey)
}
}.catch2 { error in
self?.poll(groupPublicKey).done(on: Threading.closedGroupPollerQueue) { _ in
self?.pollRecursively(groupPublicKey)
}.catch(on: Threading.closedGroupPollerQueue) { error in
// The error is logged in poll(_:)
DispatchQueue.main.async { // Timers don't do well on background queues
self?.pollRecursively(groupPublicKey)
}
self?.pollRecursively(groupPublicKey)
}
}
}

View File

@ -19,18 +19,11 @@ public final class OpenGroupPollerV2 : NSObject {
@objc public func startIfNeeded() {
guard !hasStarted else { return }
DispatchQueue.main.async { [weak self] in // Timers don't do well on background queues
guard let strongSelf = self else { return }
strongSelf.hasStarted = true
strongSelf.timer = Timer.scheduledTimer(withTimeInterval: strongSelf.pollInterval, repeats: true) { _ in
Threading.openGroupPollerQueue.async {
self?.poll().retainUntilComplete()
}
}
Threading.openGroupPollerQueue.async {
strongSelf.poll().retainUntilComplete()
}
hasStarted = true
timer = Timer.scheduledTimerOnMainThread(withTimeInterval: pollInterval, repeats: true) { _ in
self.poll().retainUntilComplete()
}
poll().retainUntilComplete()
}
@objc public func stop() {
@ -50,15 +43,17 @@ public final class OpenGroupPollerV2 : NSObject {
self.isPolling = true
let (promise, seal) = Promise<Void>.pending()
promise.retainUntilComplete()
OpenGroupAPIV2.compactPoll(server).done(on: OpenGroupAPIV2.workQueue) { [weak self] bodies in
guard let self = self else { return }
self.isPolling = false
bodies.forEach { self.handleCompactPollBody($0, isBackgroundPoll: isBackgroundPoll) }
seal.fulfill(())
}.catch(on: OpenGroupAPIV2.workQueue) { error in
SNLog("Open group polling failed due to error: \(error).")
self.isPolling = false
seal.fulfill(()) // The promise is just used to keep track of when we're done
Threading.openGroupPollerQueue.async {
OpenGroupAPIV2.compactPoll(self.server).done(on: OpenGroupAPIV2.workQueue) { [weak self] bodies in
guard let self = self else { return }
self.isPolling = false
bodies.forEach { self.handleCompactPollBody($0, isBackgroundPoll: isBackgroundPoll) }
seal.fulfill(())
}.catch(on: OpenGroupAPIV2.workQueue) { error in
SNLog("Open group polling failed due to error: \(error).")
self.isPolling = false
seal.fulfill(()) // The promise is just used to keep track of when we're done
}
}
return promise
}

View File

@ -45,21 +45,22 @@ public final class Poller : NSObject {
// MARK: Private API
private func setUpPolling() {
guard isPolling else { return }
let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then2 { [weak self] _ -> Promise<Void> in
guard let strongSelf = self else { return Promise { $0.fulfill(()) } }
strongSelf.usedSnodes.removeAll()
let (promise, seal) = Promise<Void>.pending()
strongSelf.pollNextSnode(seal: seal)
return promise
}.ensure(on: DispatchQueue.main) { [weak self] in // Timers don't do well on background queues
guard let strongSelf = self, strongSelf.isPolling else { return }
Timer.scheduledTimer(withTimeInterval: Poller.retryInterval, repeats: false) { _ in
guard let strongSelf = self else { return }
Threading.pollerQueue.async {
Threading.pollerQueue.async {
let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then(on: Threading.pollerQueue) { [weak self] _ -> Promise<Void> in
guard let strongSelf = self else { return Promise { $0.fulfill(()) } }
strongSelf.usedSnodes.removeAll()
let (promise, seal) = Promise<Void>.pending()
strongSelf.pollNextSnode(seal: seal)
return promise
}.ensure(on: Threading.pollerQueue) { [weak self] in // Timers don't do well on background queues
guard let strongSelf = self, strongSelf.isPolling else { return }
Timer.scheduledTimerOnMainThread(withTimeInterval: Poller.retryInterval, repeats: false) { _ in
guard let strongSelf = self else { return }
strongSelf.setUpPolling()
}
}
}
}
private func pollNextSnode(seal: Resolver<Void>) {
@ -70,9 +71,9 @@ public final class Poller : NSObject {
// randomElement() uses the system's default random generator, which is cryptographically secure
let nextSnode = unusedSnodes.randomElement()!
usedSnodes.insert(nextSnode)
poll(nextSnode, seal: seal).done2 {
poll(nextSnode, seal: seal).done(on: Threading.pollerQueue) {
seal.fulfill(())
}.catch2 { [weak self] error in
}.catch(on: Threading.pollerQueue) { [weak self] error in
if let error = error as? Error, error == .pollLimitReached {
self?.pollCount = 0
} else {

View File

@ -2,12 +2,13 @@ import PromiseKit
/// Delay the execution of the promise constructed in `body` by `delay` seconds.
public func withDelay<T>(_ delay: TimeInterval, completionQueue: DispatchQueue, body: @escaping () -> Promise<T>) -> Promise<T> {
#if DEBUG
assert(Thread.current.isMainThread) // Timers don't do well on background queues
#endif
let (promise, seal) = Promise<T>.pending()
Timer.scheduledTimer(withTimeInterval: delay, repeats: false) { _ in
body().done(on: completionQueue) { seal.fulfill($0) }.catch(on: completionQueue) { seal.reject($0) }
Timer.scheduledTimerOnMainThread(withTimeInterval: delay, repeats: false) { _ in
body().done(on: completionQueue) {
seal.fulfill($0)
}.catch(on: completionQueue) {
seal.reject($0)
}
}
return promise
}