Ditch long polling

This commit is contained in:
gmbnt 2020-03-25 10:27:43 +11:00
parent 7d507ba3ad
commit 5ad32af0d3
5 changed files with 41 additions and 48 deletions

View file

@ -8,8 +8,8 @@ extern NSString *const AppDelegateStoryboardMain;
@interface AppDelegate : UIResponder <UIApplicationDelegate> @interface AppDelegate : UIResponder <UIApplicationDelegate>
- (void)startLongPollerIfNeeded; - (void)startPollerIfNeeded;
- (void)stopLongPollerIfNeeded; - (void)stopPollerIfNeeded;
- (void)setUpDefaultPublicChatsIfNeeded; - (void)setUpDefaultPublicChatsIfNeeded;
- (void)startOpenGroupPollersIfNeeded; - (void)startOpenGroupPollersIfNeeded;
- (void)stopOpenGroupPollersIfNeeded; - (void)stopOpenGroupPollersIfNeeded;

View file

@ -69,7 +69,7 @@ static BOOL isInternalTestVersion = NO;
// Loki // Loki
@property (nonatomic) LKP2PServer *lokiP2PServer; @property (nonatomic) LKP2PServer *lokiP2PServer;
@property (nonatomic) LKLongPoller *lokiLongPoller; @property (nonatomic) LKPoller *lokiPoller;
@property (nonatomic) LKRSSFeedPoller *lokiNewsFeedPoller; @property (nonatomic) LKRSSFeedPoller *lokiNewsFeedPoller;
@property (nonatomic) LKRSSFeedPoller *lokiMessengerUpdatesFeedPoller; @property (nonatomic) LKRSSFeedPoller *lokiMessengerUpdatesFeedPoller;
@ -181,7 +181,7 @@ static BOOL isInternalTestVersion = NO;
[DDLog flushLog]; [DDLog flushLog];
// Loki: Stop pollers // Loki: Stop pollers
[self stopLongPollerIfNeeded]; [self stopPollerIfNeeded];
[self stopOpenGroupPollersIfNeeded]; [self stopOpenGroupPollersIfNeeded];
} }
@ -202,7 +202,7 @@ static BOOL isInternalTestVersion = NO;
[DDLog flushLog]; [DDLog flushLog];
// Loki: Stop pollers // Loki: Stop pollers
[self stopLongPollerIfNeeded]; [self stopPollerIfNeeded];
[self stopOpenGroupPollersIfNeeded]; [self stopOpenGroupPollersIfNeeded];
if (self.lokiP2PServer) { [self.lokiP2PServer stop]; } if (self.lokiP2PServer) { [self.lokiP2PServer stop]; }
@ -784,7 +784,7 @@ static BOOL isInternalTestVersion = NO;
[LKP2PAPI broadcastOnlineStatus]; [LKP2PAPI broadcastOnlineStatus];
// Loki: Start pollers // Loki: Start pollers
[self startLongPollerIfNeeded]; [self startPollerIfNeeded];
[self startOpenGroupPollersIfNeeded]; [self startOpenGroupPollersIfNeeded];
// Loki: Get device links // Loki: Get device links
@ -1477,7 +1477,7 @@ static BOOL isInternalTestVersion = NO;
[self.lokiFriendRequestExpirationJob startIfNecessary]; [self.lokiFriendRequestExpirationJob startIfNecessary];
// Loki: Start pollers // Loki: Start pollers
[self startLongPollerIfNeeded]; [self startPollerIfNeeded];
[self startOpenGroupPollersIfNeeded]; [self startOpenGroupPollersIfNeeded];
// Loki: Get device links // Loki: Get device links
@ -1591,12 +1591,12 @@ static BOOL isInternalTestVersion = NO;
#pragma mark - Loki #pragma mark - Loki
- (void)setUpLongPollerIfNeeded - (void)setUpPollerIfNeeded
{ {
if (self.lokiLongPoller != nil) { return; } if (self.lokiPoller != nil) { return; }
NSString *userHexEncodedPublicKey = OWSIdentityManager.sharedManager.identityKeyPair.hexEncodedPublicKey; NSString *userHexEncodedPublicKey = OWSIdentityManager.sharedManager.identityKeyPair.hexEncodedPublicKey;
if (userHexEncodedPublicKey == nil) { return; } if (userHexEncodedPublicKey == nil) { return; }
self.lokiLongPoller = [[LKLongPoller alloc] initOnMessagesReceived:^(NSArray<SSKProtoEnvelope *> *messages) { self.lokiPoller = [[LKPoller alloc] initOnMessagesReceived:^(NSArray<SSKProtoEnvelope *> *messages) {
for (SSKProtoEnvelope *message in messages) { for (SSKProtoEnvelope *message in messages) {
NSData *data = [message serializedDataAndReturnError:nil]; NSData *data = [message serializedDataAndReturnError:nil];
if (data != nil) { if (data != nil) {
@ -1608,15 +1608,15 @@ static BOOL isInternalTestVersion = NO;
}]; }];
} }
- (void)startLongPollerIfNeeded - (void)startPollerIfNeeded
{ {
[self setUpLongPollerIfNeeded]; [self setUpPollerIfNeeded];
[self.lokiLongPoller startIfNeeded]; [self.lokiPoller startIfNeeded];
} }
- (void)stopLongPollerIfNeeded - (void)stopPollerIfNeeded
{ {
[self.lokiLongPoller stopIfNeeded]; [self.lokiPoller stopIfNeeded];
} }
- (void)setUpDefaultPublicChatsIfNeeded - (void)setUpDefaultPublicChatsIfNeeded
@ -1713,7 +1713,7 @@ static BOOL isInternalTestVersion = NO;
[SSKEnvironment.shared.messageSenderJobQueue clearAllJobs]; [SSKEnvironment.shared.messageSenderJobQueue clearAllJobs];
[SSKEnvironment.shared.identityManager clearIdentityKey]; [SSKEnvironment.shared.identityManager clearIdentityKey];
[LKAPI clearRandomSnodePool]; [LKAPI clearRandomSnodePool];
[self stopLongPollerIfNeeded]; [self stopPollerIfNeeded];
[self stopOpenGroupPollersIfNeeded]; [self stopOpenGroupPollersIfNeeded];
[self.lokiNewsFeedPoller stop]; [self.lokiNewsFeedPoller stop];
[self.lokiMessengerUpdatesFeedPoller stop]; [self.lokiMessengerUpdatesFeedPoller stop];

View file

@ -155,7 +155,7 @@ final class LandingVC : BaseVC, LinkDeviceVCDelegate, DeviceLinkingModalDelegate
TSAccountManager.sharedInstance().phoneNumberAwaitingVerification = keyPair.hexEncodedPublicKey TSAccountManager.sharedInstance().phoneNumberAwaitingVerification = keyPair.hexEncodedPublicKey
TSAccountManager.sharedInstance().didRegister() TSAccountManager.sharedInstance().didRegister()
let appDelegate = UIApplication.shared.delegate as! AppDelegate let appDelegate = UIApplication.shared.delegate as! AppDelegate
appDelegate.startLongPollerIfNeeded() appDelegate.startPollerIfNeeded()
let deviceLinkingModal = DeviceLinkingModal(mode: .slave, delegate: self) let deviceLinkingModal = DeviceLinkingModal(mode: .slave, delegate: self)
deviceLinkingModal.modalPresentationStyle = .overFullScreen deviceLinkingModal.modalPresentationStyle = .overFullScreen
deviceLinkingModal.modalTransitionStyle = .crossDissolve deviceLinkingModal.modalTransitionStyle = .crossDissolve
@ -176,7 +176,7 @@ final class LandingVC : BaseVC, LinkDeviceVCDelegate, DeviceLinkingModalDelegate
func handleDeviceLinkingModalDismissed() { func handleDeviceLinkingModalDismissed() {
let appDelegate = UIApplication.shared.delegate as! AppDelegate let appDelegate = UIApplication.shared.delegate as! AppDelegate
appDelegate.stopLongPollerIfNeeded() appDelegate.stopPollerIfNeeded()
TSAccountManager.sharedInstance().resetForReregistration() TSAccountManager.sharedInstance().resetForReregistration()
} }

View file

@ -681,7 +681,7 @@ typedef NS_ENUM(NSInteger, HomeViewControllerSection) {
[SSKEnvironment.shared.identityManager clearIdentityKey]; [SSKEnvironment.shared.identityManager clearIdentityKey];
[LKAPI clearRandomSnodePool]; [LKAPI clearRandomSnodePool];
AppDelegate *appDelegate = (AppDelegate *)UIApplication.sharedApplication.delegate; AppDelegate *appDelegate = (AppDelegate *)UIApplication.sharedApplication.delegate;
[appDelegate stopLongPollerIfNeeded]; [appDelegate stopPollerIfNeeded];
[appDelegate stopOpenGroupPollersIfNeeded]; [appDelegate stopOpenGroupPollersIfNeeded];
[SSKEnvironment.shared.tsAccountManager resetForReregistration]; [SSKEnvironment.shared.tsAccountManager resetForReregistration];
UIViewController *rootViewController = [[OnboardingController new] initialViewController]; UIViewController *rootViewController = [[OnboardingController new] initialViewController];

View file

@ -1,20 +1,16 @@
import PromiseKit import PromiseKit
@objc(LKLongPoller) @objc(LKPoller)
public final class LokiLongPoller : NSObject { public final class LokiPoller : NSObject {
private let onMessagesReceived: ([SSKProtoEnvelope]) -> Void private let onMessagesReceived: ([SSKProtoEnvelope]) -> Void
private let storage = OWSPrimaryStorage.shared() private let storage = OWSPrimaryStorage.shared()
private var hasStarted = false private var hasStarted = false
private var hasStopped = false private var hasStopped = false
private var connections = Set<Promise<Void>>()
private var usedSnodes = Set<LokiAPITarget>() private var usedSnodes = Set<LokiAPITarget>()
// MARK: Settings // MARK: Settings
private let connectionCount = 3 private static let pollInterval: TimeInterval = 1
private let retryInterval: TimeInterval = 4 private static let retryInterval: TimeInterval = 4
// MARK: Convenience
private var userHexEncodedPublicKey: String { return getUserHexEncodedPublicKey() }
// MARK: Initialization // MARK: Initialization
@objc public init(onMessagesReceived: @escaping ([SSKProtoEnvelope]) -> Void) { @objc public init(onMessagesReceived: @escaping ([SSKProtoEnvelope]) -> Void) {
@ -25,7 +21,7 @@ public final class LokiLongPoller : NSObject {
// MARK: Public API // MARK: Public API
@objc public func startIfNeeded() { @objc public func startIfNeeded() {
guard !hasStarted else { return } guard !hasStarted else { return }
print("[Loki] Started long polling.") print("[Loki] Started polling.")
hasStarted = true hasStarted = true
hasStopped = false hasStopped = false
openConnections() openConnections()
@ -33,7 +29,7 @@ public final class LokiLongPoller : NSObject {
@objc public func stopIfNeeded() { @objc public func stopIfNeeded() {
guard !hasStopped else { return } guard !hasStopped else { return }
print("[Loki] Stopped long polling.") print("[Loki] Stopped polling.")
hasStarted = false hasStarted = false
hasStopped = true hasStopped = true
usedSnodes.removeAll() usedSnodes.removeAll()
@ -42,49 +38,46 @@ public final class LokiLongPoller : NSObject {
// MARK: Private API // MARK: Private API
private func openConnections() { private func openConnections() {
guard !hasStopped else { return } guard !hasStopped else { return }
LokiAPI.getSwarm(for: userHexEncodedPublicKey).then { [weak self] _ -> Guarantee<[Result<Void>]> in LokiAPI.getSwarm(for: getUserHexEncodedPublicKey()).then { [weak self] _ -> Promise<Void> in
guard let strongSelf = self else { return Guarantee.value([Result<Void>]()) } guard let strongSelf = self else { return Promise { $0.fulfill(()) } }
strongSelf.usedSnodes.removeAll() strongSelf.usedSnodes.removeAll()
let connections: [Promise<Void>] = (0..<strongSelf.connectionCount).map { _ in let (promise, seal) = Promise<Void>.pending()
let (promise, seal) = Promise<Void>.pending() strongSelf.pollNextSnode(seal: seal)
strongSelf.openConnectionToNextSnode(seal: seal) return promise
return promise
}
strongSelf.connections = Set(connections)
return when(resolved: connections)
}.ensure { [weak self] in }.ensure { [weak self] in
guard let strongSelf = self else { return } guard let strongSelf = self else { return }
Timer.scheduledTimer(withTimeInterval: strongSelf.retryInterval, repeats: false) { _ in Timer.scheduledTimer(withTimeInterval: LokiPoller.retryInterval, repeats: false) { _ in
guard let strongSelf = self else { return } guard let strongSelf = self else { return }
strongSelf.openConnections() strongSelf.openConnections()
} }
} }
} }
private func openConnectionToNextSnode(seal: Resolver<Void>) { private func pollNextSnode(seal: Resolver<Void>) {
let userHexEncodedPublicKey = getUserHexEncodedPublicKey()
let swarm = LokiAPI.swarmCache[userHexEncodedPublicKey] ?? [] let swarm = LokiAPI.swarmCache[userHexEncodedPublicKey] ?? []
let userHexEncodedPublicKey = self.userHexEncodedPublicKey
let unusedSnodes = Set(swarm).subtracting(usedSnodes) let unusedSnodes = Set(swarm).subtracting(usedSnodes)
if !unusedSnodes.isEmpty { if !unusedSnodes.isEmpty {
// randomElement() uses the system's default random generator, which is cryptographically secure
let nextSnode = unusedSnodes.randomElement()! let nextSnode = unusedSnodes.randomElement()!
usedSnodes.insert(nextSnode) usedSnodes.insert(nextSnode)
print("[Loki] Opening long polling connection to \(nextSnode).") print("[Loki] Polling \(nextSnode).")
longPoll(nextSnode, seal: seal).catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in poll(nextSnode, seal: seal).catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in
print("[Loki] Long polling connection to \(nextSnode) failed; dropping it and switching to next snode.") print("[Loki] Polling \(nextSnode) failed; dropping it and switching to next snode.")
LokiAPI.dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey) LokiAPI.dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey)
self?.openConnectionToNextSnode(seal: seal) self?.pollNextSnode(seal: seal)
} }
} else { } else {
seal.fulfill(()) seal.fulfill(())
} }
} }
private func longPoll(_ target: LokiAPITarget, seal: Resolver<Void>) -> Promise<Void> { private func poll(_ target: LokiAPITarget, seal: Resolver<Void>) -> Promise<Void> {
return LokiAPI.getRawMessages(from: target, usingLongPolling: true).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise<Void> in return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise<Void> in
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) } guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) }
let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target) let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target)
strongSelf.onMessagesReceived(messages) strongSelf.onMessagesReceived(messages)
return strongSelf.longPoll(target, seal: seal) return strongSelf.poll(target, seal: seal)
} }
} }
} }