Re-integrate LokiLongPoller

This commit is contained in:
Niels Andriesse 2019-08-29 15:21:45 +10:00
parent d1d928ee89
commit 378a30e9d6
6 changed files with 46 additions and 143 deletions

View File

@ -8,6 +8,7 @@ extern NSString *const AppDelegateStoryboardMain;
@interface AppDelegate : UIResponder <UIApplicationDelegate>
- (void)stopLongPollerIfNeeded;
- (void)createGroupChatsIfNeeded;
- (void)createRSSFeedsIfNeeded;
- (void)startGroupChatPollersIfNeeded;

View File

@ -63,7 +63,10 @@ static NSTimeInterval launchStartedAt;
@property (nonatomic) BOOL hasInitialRootViewController;
@property (nonatomic) BOOL areVersionMigrationsComplete;
@property (nonatomic) BOOL didAppLaunchFail;
// Loki
@property (nonatomic) LKP2PServer *lokiP2PServer;
@property (nonatomic) LKLongPoller *lokiLongPoller;
@property (nonatomic) LKGroupChatPoller *lokiPublicChatPoller;
@property (nonatomic) LKRSSFeedPoller *lokiNewsFeedPoller;
@property (nonatomic) LKRSSFeedPoller *lokiMessengerUpdatesFeedPoller;
@ -175,7 +178,7 @@ static NSTimeInterval launchStartedAt;
[DDLog flushLog];
[LKAPI stopLongPolling];
[self stopLongPollerIfNeeded];
}
- (void)applicationWillEnterForeground:(UIApplication *)application
@ -194,7 +197,8 @@ static NSTimeInterval launchStartedAt;
[DDLog flushLog];
[LKAPI stopLongPolling];
[self stopLongPollerIfNeeded];
if (self.lokiP2PServer) { [self.lokiP2PServer stop]; }
}
@ -761,7 +765,7 @@ static NSTimeInterval launchStartedAt;
[Environment.shared.contactsManager fetchSystemContactsOnceIfAlreadyAuthorized];
// Loki: Start long polling
[LKAPI startLongPollingIfNeeded];
[self startLongPollerIfNeeded];
// Loki: Tell our friends that we are online
[LKP2PAPI broadcastOnlineStatus];
@ -1359,8 +1363,8 @@ static NSTimeInterval launchStartedAt;
// For non-legacy users, read receipts are on by default.
[self.readReceiptManager setAreReadReceiptsEnabled:YES];
// Start long polling
[LKAPI startLongPollingIfNeeded];
// Loki: Start long polling
[self startLongPollerIfNeeded];
}
}
@ -1406,23 +1410,6 @@ static NSTimeInterval launchStartedAt;
[UIViewController attemptRotationToDeviceOrientation];
}
#pragma mark - Long polling
- (void)handleNewMessagesReceived:(NSNotification *)notification
{
NSArray *messages = (NSArray *)notification.userInfo[@"messages"];
NSLog(@"[Loki] Received %lu messages through long polling.", messages.count);
for (SSKProtoEnvelope *envelope in messages) {
NSData *envelopeData = [envelope serializedDataAndReturnError:nil];
if (envelopeData != nil) {
[SSKEnvironment.shared.messageReceiver handleReceivedEnvelopeData:envelopeData];
} else {
OWSFailDebug(@"Failed to deserialize envelope.");
}
}
}
#pragma mark - status bar touches
- (void)touchesBegan:(NSSet *)touches withEvent:(UIEvent *)event
@ -1487,6 +1474,34 @@ static NSTimeInterval launchStartedAt;
#pragma mark - Loki
- (void)setUpLongPollerIfNeeded
{
if (self.lokiLongPoller != nil) { return; }
NSString *userHexEncodedPublicKey = OWSIdentityManager.sharedManager.identityKeyPair.hexEncodedPublicKey;
if (userHexEncodedPublicKey == nil) { return; }
self.lokiLongPoller = [[LKLongPoller alloc] initOnMessagesReceived:^(NSArray<SSKProtoEnvelope *> *messages) {
for (SSKProtoEnvelope *message in messages) {
NSData *data = [message serializedDataAndReturnError:nil];
if (data != nil) {
[SSKEnvironment.shared.messageReceiver handleReceivedEnvelopeData:data];
} else {
NSLog(@"[Loki] Failed to deserialize envelope.");
}
}
}];
}
- (void)startLongPollerIfNeeded
{
[self setUpLongPollerIfNeeded];
[self.lokiLongPoller startIfNeeded];
}
- (void)stopLongPollerIfNeeded
{
[self.lokiLongPoller stopIfNeeded];
}
- (LKGroupChat *)lokiPublicChat
{
return [[LKGroupChat alloc] initWithServerID:@(LKGroupChatAPI.publicChatServerID).unsignedIntegerValue server:LKGroupChatAPI.publicChatServer displayName:NSLocalizedString(@"Loki Public Chat", @"") isDeletable:true];

View File

@ -2,6 +2,7 @@
// Copyright (c) 2019 Open Whisper Systems. All rights reserved.
//
#import "AppDelegate.h"
#import "AppSettingsViewController.h"
#import "AboutTableViewController.h"
#import "AdvancedSettingsTableViewController.h"
@ -533,7 +534,8 @@
[ThreadUtil deleteAllContent];
[SSKEnvironment.shared.identityManager clearIdentityKey];
[LKAPI clearRandomSnodePool];
[LKAPI stopLongPolling];
AppDelegate *appDelegate = (AppDelegate *)UIApplication.sharedApplication.delegate;
[appDelegate stopLongPollerIfNeeded];
[SSKEnvironment.shared.tsAccountManager resetForReregistration];
UIViewController *rootViewController = [[OnboardingController new] initialViewController];
OWSNavigationController *navigationController = [[OWSNavigationController alloc] initWithRootViewController:rootViewController];

View File

@ -680,7 +680,8 @@ typedef NS_ENUM(NSInteger, HomeViewControllerSection) {
[ThreadUtil deleteAllContent];
[SSKEnvironment.shared.identityManager clearIdentityKey];
[LKAPI clearRandomSnodePool];
[LKAPI stopLongPolling];
AppDelegate *appDelegate = (AppDelegate *)UIApplication.sharedApplication.delegate;
[appDelegate stopLongPollerIfNeeded];
[SSKEnvironment.shared.tsAccountManager resetForReregistration];
UIViewController *rootViewController = [[OnboardingController new] initialViewController];
OWSNavigationController *navigationController = [[OWSNavigationController alloc] initWithRootViewController:rootViewController];

View File

@ -1,116 +0,0 @@
import PromiseKit
private typealias Callback = () -> Void
public extension LokiAPI {
private static var isLongPolling = false
private static var shouldStopPolling = false
private static var usedSnodes = [LokiAPITarget]()
private static var cancels = [Callback]()
/// Start long polling.
/// This will send a notification if new messages were received
@objc public static func startLongPollingIfNeeded() {
guard !isLongPolling else { return }
isLongPolling = true
shouldStopPolling = false
print("[Loki] Started long polling.")
longPoll()
}
/// Stop long polling
@objc public static func stopLongPolling() {
shouldStopPolling = true
isLongPolling = false
usedSnodes.removeAll()
cancelAllPromises()
print("[Loki] Stopped long polling.")
}
/// The long polling loop
private static func longPoll() {
// This is here so we can stop the infinite loop
guard !shouldStopPolling else { return }
getSwarm(for: userHexEncodedPublicKey).then { _ -> Guarantee<[Result<Void>]> in
var promises = [Promise<Void>]()
let connections = 3
for i in 0..<connections {
let (promise, cancel) = openConnection()
promises.append(promise)
cancels.append(cancel)
}
return when(resolved: promises)
}.done { _ in
// Since all promises are complete, we can clear the cancels
cancelAllPromises()
// Keep long polling until it is stopped
longPoll()
}.retainUntilComplete()
}
private static func cancelAllPromises() {
cancels.forEach { cancel in cancel() }
cancels.removeAll()
}
private static func getUnusedSnodes() -> [LokiAPITarget] {
let snodes = LokiAPI.swarmCache[userHexEncodedPublicKey] ?? []
return snodes.filter { !usedSnodes.contains($0) }
}
/// Open a connection to an unused snode and get messages from it
private static func openConnection() -> (Promise<Void>, cancel: Callback) {
var isCancelled = false
let cancel = {
isCancelled = true
}
func connectToNextSnode() -> Promise<Void> {
guard let nextSnode = getUnusedSnodes().first else {
// We don't have anymore unused snodes
return Promise.value(())
}
// Add the snode to the used array
usedSnodes.append(nextSnode)
func getMessagesInfinitely(from target: LokiAPITarget) -> Promise<Void> {
// The only way to exit the infinite loop is to throw an error 3 times or cancel
return getRawMessages(from: target, usingLongPolling: true).then { rawResponse -> Promise<Void> in
// Check if we need to abort
guard !isCancelled else { throw PMKError.cancelled }
// Process the messages
let messages = parseRawMessagesResponse(rawResponse, from: target)
// Send our messages as a notification
NotificationCenter.default.post(name: .newMessagesReceived, object: nil, userInfo: ["messages": messages])
// Continue fetching if we haven't cancelled
return getMessagesInfinitely(from: target)
}.retryingIfNeeded(maxRetryCount: 3)
}
// Keep getting messages for this snode
// If we errored out then connect to the next snode
return getMessagesInfinitely(from: nextSnode).recover { _ -> Promise<Void> in
// Cancelled, so just return successfully
guard !isCancelled else { return Promise.value(()) }
// Connect to the next snode if we haven't cancelled
// We also need to remove the cached snode so we don't contact it again
dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey)
return connectToNextSnode()
}
}
// Keep connecting to snodes
return (connectToNextSnode(), cancel)
}
}

View File

@ -17,13 +17,13 @@ public final class LokiLongPoller : NSObject {
private var userHexEncodedPublicKey: String { return OWSIdentityManager.shared().identityKeyPair()!.hexEncodedPublicKey }
// MARK: Initialization
public init(onMessagesReceived: @escaping ([SSKProtoEnvelope]) -> Void) {
@objc public init(onMessagesReceived: @escaping ([SSKProtoEnvelope]) -> Void) {
self.onMessagesReceived = onMessagesReceived
super.init()
}
// MARK: Public API
public func startIfNeeded() {
@objc public func startIfNeeded() {
guard !hasStarted else { return }
print("[Loki] Started long polling.")
hasStarted = true
@ -31,7 +31,7 @@ public final class LokiLongPoller : NSObject {
openConnections()
}
public func stopIfNeeded() {
@objc public func stopIfNeeded() {
guard !hasStopped else { return }
print("[Loki] Stopped long polling.")
hasStarted = false