Fix sealed sender related multi device issues

This commit is contained in:
Niels Andriesse 2020-02-14 16:32:47 +11:00
parent 8b201f594e
commit bbb999dc83
10 changed files with 110 additions and 60 deletions

View File

@ -5,7 +5,7 @@
<key>BuildDetails</key>
<dict>
<key>CarthageVersion</key>
<string>0.34.0</string>
<string>0.33.0</string>
<key>OSXVersion</key>
<string>10.15.3</string>
<key>WebRTCCommit</key>

View File

@ -10,24 +10,10 @@ public extension LokiAPI {
fileprivate static let failureThreshold = 2
// MARK: Caching
internal static var swarmCache: [String:[LokiAPITarget]] = [:]
private static let swarmCacheKey = "swarmCacheKey"
private static let swarmCacheCollection = "swarmCacheCollection"
internal static var swarmCache: [String:[LokiAPITarget]] {
get {
var result: [String:[LokiAPITarget]]? = nil
storage.dbReadConnection.read { transaction in
result = transaction.object(forKey: swarmCacheKey, inCollection: swarmCacheCollection) as! [String:[LokiAPITarget]]?
}
return result ?? [:]
}
set {
storage.dbReadWriteConnection.readWrite { transaction in
transaction.setObject(newValue, forKey: swarmCacheKey, inCollection: swarmCacheCollection)
}
}
}
internal static func dropIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) {
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {

View File

@ -105,9 +105,17 @@ public final class LokiAPI : NSObject {
}
public static func getDestinations(for hexEncodedPublicKey: String) -> Promise<[Destination]> {
var result: Promise<[Destination]>!
storage.dbReadConnection.readWrite { transaction in
result = getDestinations(for: hexEncodedPublicKey, in: transaction)
}
return result
}
public static func getDestinations(for hexEncodedPublicKey: String, in transaction: YapDatabaseReadWriteTransaction) -> Promise<[Destination]> {
let (promise, seal) = Promise<[Destination]>.pending()
func getDestinations() {
storage.dbReadConnection.read { transaction in
func getDestinations(in transaction: YapDatabaseReadTransaction? = nil) {
func getDestinationsInternal(in transaction: YapDatabaseReadTransaction) {
var destinations: [Destination] = []
let masterHexEncodedPublicKey = storage.getMasterHexEncodedPublicKey(for: hexEncodedPublicKey, in: transaction) ?? hexEncodedPublicKey
let masterDestination = Destination(hexEncodedPublicKey: masterHexEncodedPublicKey, kind: .master)
@ -117,6 +125,13 @@ public final class LokiAPI : NSObject {
destinations.append(contentsOf: slaveDestinations)
seal.fulfill(destinations)
}
if let transaction = transaction {
getDestinationsInternal(in: transaction)
} else {
storage.dbReadConnection.read { transaction in
getDestinationsInternal(in: transaction)
}
}
}
let timeSinceLastUpdate: TimeInterval
if let lastDeviceLinkUpdate = lastDeviceLinkUpdate[hexEncodedPublicKey] {
@ -125,23 +140,21 @@ public final class LokiAPI : NSObject {
timeSinceLastUpdate = .infinity
}
if timeSinceLastUpdate > deviceLinkUpdateInterval {
storage.dbReadConnection.read { transaction in
let masterHexEncodedPublicKey = storage.getMasterHexEncodedPublicKey(for: hexEncodedPublicKey, in: transaction) ?? hexEncodedPublicKey
LokiFileServerAPI.getDeviceLinks(associatedWith: masterHexEncodedPublicKey).done(on: DispatchQueue.global()) { _ in
getDestinations()
let masterHexEncodedPublicKey = storage.getMasterHexEncodedPublicKey(for: hexEncodedPublicKey, in: transaction) ?? hexEncodedPublicKey
LokiFileServerAPI.getDeviceLinks(associatedWith: masterHexEncodedPublicKey, in: transaction).done(on: DispatchQueue.global()) { _ in
getDestinations()
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()
}.catch(on: DispatchQueue.global()) { error in
if (error as? LokiDotNetAPI.LokiDotNetAPIError) == LokiDotNetAPI.LokiDotNetAPIError.parsingFailed {
// Don't immediately re-fetch in case of failure due to a parsing error
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()
}.catch(on: DispatchQueue.global()) { error in
if (error as? LokiDotNetAPI.LokiDotNetAPIError) == LokiDotNetAPI.LokiDotNetAPIError.parsingFailed {
// Don't immediately re-fetch in case of failure due to a parsing error
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()
getDestinations()
} else {
seal.reject(error)
}
getDestinations()
} else {
seal.reject(error)
}
}
} else {
getDestinations()
getDestinations(in: transaction)
}
return promise
}
@ -205,6 +218,12 @@ public final class LokiAPI : NSObject {
return AnyPromise.from(promise)
}
@objc(getDestinationsFor:inTransaction:)
public static func objc_getDestinations(for hexEncodedPublicKey: String, in transaction: YapDatabaseReadWriteTransaction) -> AnyPromise {
let promise = getDestinations(for: hexEncodedPublicKey, in: transaction)
return AnyPromise.from(promise)
}
@objc(sendSignalMessage:onP2PSuccess:)
public static func objc_sendSignalMessage(_ signalMessage: SignalMessage, onP2PSuccess: @escaping () -> Void) -> AnyPromise {
let promise = sendSignalMessage(signalMessage, onP2PSuccess: onP2PSuccess).mapValues { AnyPromise.from($0) }.map { Set($0) }

View File

@ -26,18 +26,32 @@ public class LokiDotNetAPI : NSObject {
/// To be overridden by subclasses.
internal class var authTokenCollection: String { preconditionFailure("authTokenCollection is abstract and must be overridden.") }
private static func getAuthTokenFromDatabase(for server: String) -> String? {
var result: String? = nil
storage.dbReadConnection.read { transaction in
result = transaction.object(forKey: server, inCollection: authTokenCollection) as! String?
private static func getAuthTokenFromDatabase(for server: String, in transaction: YapDatabaseReadTransaction? = nil) -> String? {
func getAuthTokenInternal(in transaction: YapDatabaseReadTransaction) -> String? {
return transaction.object(forKey: server, inCollection: authTokenCollection) as! String?
}
if let transaction = transaction {
return getAuthTokenInternal(in: transaction)
} else {
var result: String? = nil
storage.dbReadConnection.read { transaction in
result = getAuthTokenInternal(in: transaction)
}
return result
}
return result
}
private static func setAuthToken(for server: String, to newValue: String) {
storage.dbReadWriteConnection.readWrite { transaction in
private static func setAuthToken(for server: String, to newValue: String, in transaction: YapDatabaseReadWriteTransaction? = nil) {
func setAuthTokenInternal(in transaction: YapDatabaseReadWriteTransaction) {
transaction.setObject(newValue, forKey: server, inCollection: authTokenCollection)
}
if let transaction = transaction {
setAuthTokenInternal(in: transaction)
} else {
storage.dbReadWriteConnection.readWrite { transaction in
setAuthTokenInternal(in: transaction)
}
}
}
// MARK: Lifecycle
@ -146,12 +160,12 @@ public class LokiDotNetAPI : NSObject {
}
// MARK: Internal API
internal static func getAuthToken(for server: String) -> Promise<String> {
if let token = getAuthTokenFromDatabase(for: server) {
internal static func getAuthToken(for server: String, in transaction: YapDatabaseReadWriteTransaction? = nil) -> Promise<String> {
if let token = getAuthTokenFromDatabase(for: server, in: transaction) {
return Promise.value(token)
} else {
return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map { token -> String in
setAuthToken(for: server, to: token)
setAuthToken(for: server, to: token, in: transaction)
return token
}
}

View File

@ -19,20 +19,20 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
// MARK: Device Links (Public API)
/// Gets the device links associated with the given hex encoded public key from the
/// server and stores and returns the valid ones.
public static func getDeviceLinks(associatedWith hexEncodedPublicKey: String) -> Promise<Set<DeviceLink>> {
return getDeviceLinks(associatedWith: [ hexEncodedPublicKey ])
public static func getDeviceLinks(associatedWith hexEncodedPublicKey: String, in transaction: YapDatabaseReadWriteTransaction? = nil) -> Promise<Set<DeviceLink>> {
return getDeviceLinks(associatedWith: [ hexEncodedPublicKey ], in: transaction)
}
/// Gets the device links associated with the given hex encoded public keys from the
/// server and stores and returns the valid ones.
public static func getDeviceLinks(associatedWith hexEncodedPublicKeys: Set<String>) -> Promise<Set<DeviceLink>> {
public static func getDeviceLinks(associatedWith hexEncodedPublicKeys: Set<String>, in transaction: YapDatabaseReadWriteTransaction? = nil) -> Promise<Set<DeviceLink>> {
let hexEncodedPublicKeysDescription = "[ \(hexEncodedPublicKeys.joined(separator: ", ")) ]"
print("[Loki] Getting device links for: \(hexEncodedPublicKeysDescription).")
return getAuthToken(for: server).then(on: DispatchQueue.global()) { token -> Promise<Set<DeviceLink>> in
return getAuthToken(for: server, in: transaction).then(on: DispatchQueue.global()) { token -> Promise<Set<DeviceLink>> in
let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1"
let url = URL(string: "\(server)/users?\(queryParameters)")!
let request = TSRequest(url: url)
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global()).map { $0.responseObject }.map { rawResponse -> Set<DeviceLink> in
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { $0.responseObject }.map(on: DispatchQueue.global()) { rawResponse -> Set<DeviceLink> in
guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else {
print("[Loki] Couldn't parse device links for users: \(hexEncodedPublicKeys) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed
@ -74,7 +74,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
return deviceLink
}
})
}.map { deviceLinks -> Set<DeviceLink> in
}.map(on: DispatchQueue.global()) { deviceLinks -> Set<DeviceLink> in
storage.dbReadWriteConnection.readWrite { transaction in
storage.setDeviceLinks(deviceLinks, in: transaction)
}

View File

@ -83,13 +83,8 @@ public final class LokiLongPoller : NSObject {
return LokiAPI.getRawMessages(from: target, usingLongPolling: true).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise<Void> in
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) }
let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target)
let hexEncodedPublicKeys = Set(messages.compactMap { $0.source })
let promises = hexEncodedPublicKeys.map { LokiAPI.getDestinations(for: $0) }
return when(resolved: promises).then(on: DispatchQueue.global()) { _ -> Promise<Void> in
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) }
strongSelf.onMessagesReceived(messages)
return strongSelf.longPoll(target, seal: seal)
}
strongSelf.onMessagesReceived(messages)
return strongSelf.longPoll(target, seal: seal)
}
}
}

View File

@ -9,11 +9,19 @@ NS_ASSUME_NONNULL_BEGIN
@class SSKProtoEnvelope;
@class YapDatabaseReadWriteTransaction;
@interface OWSMessageContentQueue : NSObject
- (dispatch_queue_t)serialQueue;
@end
// This class is used to write incoming (decrypted, unprocessed)
// messages to a durable queue and then process them in batches,
// in the order in which they were received.
@interface OWSBatchMessageProcessor : NSObject
@property (nonatomic, readonly) OWSMessageContentQueue *processingQueue;
- (instancetype)init NS_UNAVAILABLE;
- (instancetype)initWithPrimaryStorage:(OWSPrimaryStorage *)primaryStorage NS_DESIGNATED_INITIALIZER;

View File

@ -237,7 +237,7 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
#pragma mark - Queue Processing
@interface OWSMessageContentQueue : NSObject
@interface OWSMessageContentQueue ()
@property (nonatomic, readonly) YapDatabaseConnection *dbConnection;
@property (nonatomic, readonly) OWSMessageContentJobFinder *finder;
@ -431,10 +431,8 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
void (^reportFailure)(YapDatabaseReadWriteTransaction *transaction) = ^(
YapDatabaseReadWriteTransaction *transaction) {
// TODO: Add analytics.
TSErrorMessage *errorMessage = [TSErrorMessage corruptedMessageInUnknownThread];
[SSKEnvironment.shared.notificationsManager notifyUserForThreadlessErrorMessage:errorMessage
transaction:transaction];
[SSKEnvironment.shared.notificationsManager notifyUserForThreadlessErrorMessage:errorMessage transaction:transaction];
};
@try {
@ -449,9 +447,9 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
serverID:0];
}
} @catch (NSException *exception) {
// OWSFailDebug(@"Received an invalid envelope: %@", exception.debugDescription);
reportFailure(transaction);
}
[processedJobs addObject:job];
if (self.isAppInBackground) {
@ -473,7 +471,6 @@ NSString *const OWSMessageContentJobFinderExtensionGroup = @"OWSMessageContentJo
@interface OWSBatchMessageProcessor ()
@property (nonatomic, readonly) OWSMessageContentQueue *processingQueue;
@property (nonatomic, readonly) YapDatabaseConnection *dbConnection;
@end

View File

@ -59,6 +59,8 @@
#import <YapDatabase/YapDatabase.h>
#import <SignalServiceKit/SignalServiceKit-Swift.h>
#import "OWSDispatch.h"
#import "OWSBatchMessageProcessor.h"
#import "OWSQueues.h"
NS_ASSUME_NONNULL_BEGIN
@ -277,6 +279,13 @@ NS_ASSUME_NONNULL_BEGIN
OWSAssertDebug(![self isEnvelopeSenderBlocked:envelope]);
// Loki: Ignore any friend requests that we got before restoration
uint64_t restorationTime = [NSNumber numberWithDouble:[OWSPrimaryStorage.sharedManager getRestorationTime]].unsignedLongLongValue;
if (envelope.type == SSKProtoEnvelopeTypeFriendRequest && envelope.timestamp < restorationTime * 1000) {
[LKLogger print:@"[Loki] Ignoring friend request received before restoration."];
return;
}
[self checkForUnknownLinkedDevice:envelope transaction:transaction];
switch (envelope.type) {
@ -1391,6 +1400,19 @@ NS_ASSUME_NONNULL_BEGIN
return nil;
}
dispatch_queue_t messageProcessingQueue = SSKEnvironment.shared.batchMessageProcessor.processingQueue.serialQueue;
AssertOnDispatchQueue(messageProcessingQueue);
if ([ECKeyPair isValidHexEncodedPublicKeyWithCandidate:envelope.source]) {
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
[[LKAPI getDestinationsFor:envelope.source inTransaction:transaction].ensureOn(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^() {
dispatch_semaphore_signal(semaphore);
}).catchOn(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(NSError *error) {
dispatch_semaphore_signal(semaphore);
}) retainUntilComplete];
dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, 10 * NSEC_PER_SEC));
}
if (groupId.length > 0) {
NSMutableSet *newMemberIds = [NSMutableSet setWithArray:dataMessage.group.members];
NSMutableSet *removedMemberIds = [NSMutableSet new];

View File

@ -423,6 +423,15 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
successBlock:^(OWSMessageDecryptResult *result, YapDatabaseReadWriteTransaction *transaction) {
OWSAssertDebug(transaction);
// Loki: Don't process any messages from ourself
ECKeyPair *_Nullable keyPair = OWSIdentityManager.sharedManager.identityKeyPair;
if (keyPair && [result.source isEqualToString:keyPair.hexEncodedPublicKey]) {
dispatch_async(self.serialQueue, ^{
completion(YES);
});
return;
}
// We persist the decrypted envelope data in the same transaction within which
// it was decrypted to prevent data loss. If the new job isn't persisted,
// the session state side effects of its decryption are also rolled back.