diff --git a/Podfile.lock b/Podfile.lock index 153b82414..af8dda1fe 100644 --- a/Podfile.lock +++ b/Podfile.lock @@ -125,7 +125,7 @@ EXTERNAL SOURCES: :branch: signal-master :git: https://github.com/WhisperSystems/JSQMessagesViewController.git SignalServiceKit: - :path: . + :path: "." SocketRocket: :git: https://github.com/facebook/SocketRocket.git diff --git a/Signal/src/Jobs/MessageFetcherJob.swift b/Signal/src/Jobs/MessageFetcherJob.swift index 4fb0e100f..a63696def 100644 --- a/Signal/src/Jobs/MessageFetcherJob.swift +++ b/Signal/src/Jobs/MessageFetcherJob.swift @@ -13,13 +13,13 @@ class MessageFetcherJob: NSObject { // MARK: injected dependencies let networkManager: TSNetworkManager - let messagesManager: TSMessagesManager + let messageReceiver: OWSMessageReceiver let signalService: OWSSignalService var runPromises = [Double: Promise]() - init(messagesManager: TSMessagesManager, networkManager: TSNetworkManager, signalService: OWSSignalService) { - self.messagesManager = messagesManager + init(messageReceiver: OWSMessageReceiver, networkManager: TSNetworkManager, signalService: OWSSignalService) { + self.messageReceiver = messageReceiver self.networkManager = networkManager self.signalService = signalService } @@ -39,10 +39,8 @@ class MessageFetcherJob: NSObject { let runPromise = self.fetchUndeliveredMessages().then { (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool) -> Void in for envelope in envelopes { Logger.info("\(self.TAG) received envelope.") - self.messagesManager.handleReceivedEnvelope(envelope, completion: { - // Don't acknowledge delivery until the envelope has been processed. - self.acknowledgeDelivery(envelope: envelope) - }) + self.messageReceiver.handleReceivedEnvelope(envelope) + self.acknowledgeDelivery(envelope: envelope) } if more { Logger.info("\(self.TAG) more messages, so recursing.") diff --git a/Signal/src/Signal-Bridging-Header.h b/Signal/src/Signal-Bridging-Header.h index 0a5a61e95..0e1c51d52 100644 --- a/Signal/src/Signal-Bridging-Header.h +++ b/Signal/src/Signal-Bridging-Header.h @@ -61,6 +61,7 @@ #import #import #import +#import #import #import #import diff --git a/Signal/src/network/PushManager.m b/Signal/src/network/PushManager.m index 47e08e7d0..0e44f98bc 100644 --- a/Signal/src/network/PushManager.m +++ b/Signal/src/network/PushManager.m @@ -10,10 +10,10 @@ #import "Signal-Swift.h" #import "ThreadUtil.h" #import +#import #import #import #import -#import #import #import @@ -54,14 +54,14 @@ NSString *const Signal_Message_MarkAsRead_Identifier = @"Signal_Message_MarkAsRe return [self initWithNetworkManager:[Environment getCurrent].networkManager storageManager:[TSStorageManager sharedManager] callUIAdapter:[Environment getCurrent].callService.callUIAdapter - messagesManager:[TSMessagesManager sharedManager] + messageReceiver:[OWSMessageReceiver sharedInstance] messageSender:[Environment getCurrent].messageSender]; } - (instancetype)initWithNetworkManager:(TSNetworkManager *)networkManager storageManager:(TSStorageManager *)storageManager callUIAdapter:(CallUIAdapter *)callUIAdapter - messagesManager:(TSMessagesManager *)messagesManager + messageReceiver:(OWSMessageReceiver *)messageReceiver messageSender:(OWSMessageSender *)messageSender { self = [super init]; @@ -73,7 +73,7 @@ NSString *const Signal_Message_MarkAsRead_Identifier = @"Signal_Message_MarkAsRe _messageSender = messageSender; OWSSignalService *signalService = [OWSSignalService sharedInstance]; - _messageFetcherJob = [[OWSMessageFetcherJob alloc] initWithMessagesManager:messagesManager + _messageFetcherJob = [[OWSMessageFetcherJob alloc] initWithMessageReceiver:messageReceiver networkManager:networkManager signalService:signalService]; diff --git a/SignalServiceKit/src/Messages/InvalidKeyMessages/TSInvalidIdentityKeyReceivingErrorMessage.m b/SignalServiceKit/src/Messages/InvalidKeyMessages/TSInvalidIdentityKeyReceivingErrorMessage.m index 24119ade8..85eefd652 100644 --- a/SignalServiceKit/src/Messages/InvalidKeyMessages/TSInvalidIdentityKeyReceivingErrorMessage.m +++ b/SignalServiceKit/src/Messages/InvalidKeyMessages/TSInvalidIdentityKeyReceivingErrorMessage.m @@ -5,6 +5,7 @@ #import "TSInvalidIdentityKeyReceivingErrorMessage.h" #import "OWSFingerprint.h" #import "OWSIdentityManager.h" +#import "OWSMessageReceiver.h" #import "TSContactThread.h" #import "TSDatabaseView.h" #import "TSErrorMessage_privateConstructor.h" @@ -17,6 +18,7 @@ NS_ASSUME_NONNULL_BEGIN +/// TODO we can eventually deprecate this, since incoming messages are now always decrypted. @interface TSInvalidIdentityKeyReceivingErrorMessage () @property (nonatomic, readonly, copy) NSString *authorId; @@ -65,7 +67,6 @@ NS_ASSUME_NONNULL_BEGIN return _envelope; } - - (void)acceptNewIdentityKey { if (self.errorType != TSErrorMessageWrongTrustedIdentityKey) { @@ -89,7 +90,7 @@ NS_ASSUME_NONNULL_BEGIN [self.thread receivedMessagesForInvalidKey:newKey]; for (TSInvalidIdentityKeyReceivingErrorMessage *errorMessage in messagesToDecrypt) { - [[TSMessagesManager sharedManager] handleReceivedEnvelope:errorMessage.envelope completion:nil]; + [[OWSMessageReceiver sharedInstance] handleReceivedEnvelope:errorMessage.envelope]; // Here we remove the existing error message because handleReceivedEnvelope will either // 1.) succeed and create a new successful message in the thread or... diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.h b/SignalServiceKit/src/Messages/OWSMessageReceiver.h new file mode 100644 index 000000000..6418f7b3c --- /dev/null +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.h @@ -0,0 +1,19 @@ +// +// Copyright (c) 2017 Open Whisper Systems. All rights reserved. +// + +NS_ASSUME_NONNULL_BEGIN + +@class OWSSignalServiceProtosEnvelope; +@class YapDatabase; + +@interface OWSMessageReceiver : NSObject + ++ (instancetype)sharedInstance; ++ (void)syncRegisterDatabaseExtension:(YapDatabase *)database; + +- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope; + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m new file mode 100644 index 000000000..a5cdfc3f7 --- /dev/null +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -0,0 +1,338 @@ +// +// Copyright (c) 2017 Open Whisper Systems. All rights reserved. +// + +#import "OWSMessageReceiver.h" +#import "OWSSignalServiceProtos.pb.h" +#import "TSDatabaseView.h" +#import "TSMessagesManager.h" +#import "TSStorageManager.h" +#import "TSYapDatabaseObject.h" +#import +#import +#import + +NS_ASSUME_NONNULL_BEGIN + +#pragma mark - Persisted data model +@class OWSSignalServiceProtosEnvelope; + +@interface OWSMessageProcessingJob : TSYapDatabaseObject + +@property (nonatomic, readonly) OWSSignalServiceProtosEnvelope *envelopeProto; +@property (nonatomic, readonly) NSDate *createdAt; + +- (instancetype)initWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithUniqueId:(NSString *)uniqueId NS_UNAVAILABLE; + +@end + +@interface OWSMessageProcessingJob () + +@property (nonatomic, readonly) NSData *envelopeData; + +@end + +// TODO rename? +@implementation OWSMessageProcessingJob + +- (instancetype)initWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope +{ + self = [super initWithUniqueId:[NSUUID new].UUIDString]; + if (!self) { + return self; + } + + _envelopeData = envelope.data; + _createdAt = [NSDate new]; + + return self; +} + +- (OWSSignalServiceProtosEnvelope *)envelopeProto +{ + return [OWSSignalServiceProtosEnvelope parseFromData:self.envelopeData]; +} + +@end + +#pragma mark - Finder + +NSString *const OWSMessageProcessingJobFinderExtensionName = @"OWSMessageProcessingJobFinderExtensionName"; +NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProcessingJobFinderExtensionGroup"; + +@interface OWSMessageProcessingJobFinder : NSObject + +- (nullable OWSMessageProcessingJob *)nextJob; +- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope; +- (void)removeJobWithId:(NSString *)uniqueId; + +@end + +@interface OWSMessageProcessingJobFinder () + +@property (nonatomic, readonly) YapDatabaseConnection *dbConnection; + +@end + +@implementation OWSMessageProcessingJobFinder + +- (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection +{ + OWSSingletonAssert(); + + self = [super init]; + if (!self) { + return self; + } + + _dbConnection = dbConnection; + + return self; +} + +- (nullable OWSMessageProcessingJob *)nextJob +{ + __block OWSMessageProcessingJob *_Nullable job; + [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { + YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSMessageProcessingJobFinderExtensionName]; + OWSAssert(viewTransaction != nil); + job = [viewTransaction firstObjectInGroup:OWSMessageProcessingJobFinderExtensionGroup]; + }]; + + return job; +} + +- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope +{ + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [[[OWSMessageProcessingJob alloc] initWithEnvelope:envelope] saveWithTransaction:transaction]; + }]; +} + +- (void)removeJobWithId:(NSString *)uniqueId +{ + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [transaction removeObjectForKey:uniqueId inCollection:[OWSMessageProcessingJob collection]]; + }]; +} + ++ (YapDatabaseView *)databaseExension +{ + + YapDatabaseViewSorting *sorting = + [YapDatabaseViewSorting withObjectBlock:^NSComparisonResult(YapDatabaseReadTransaction *transaction, + NSString *group, + NSString *collection1, + NSString *key1, + id object1, + NSString *collection2, + NSString *key2, + id object2) { + + if (![object1 isKindOfClass:[OWSMessageProcessingJob class]]) { + OWSFail(@"Unexpected object: %@ in collection: %@", object1, collection1) return NSOrderedSame; + } + OWSMessageProcessingJob *job1 = (OWSMessageProcessingJob *)object1; + + if (![object2 isKindOfClass:[OWSMessageProcessingJob class]]) { + OWSFail(@"Unexpected object: %@ in collection: %@", object2, collection2) return NSOrderedSame; + } + OWSMessageProcessingJob *job2 = (OWSMessageProcessingJob *)object2; + + return [job1.createdAt compare:job2.createdAt]; + }]; + + YapDatabaseViewGrouping *grouping = + [YapDatabaseViewGrouping withObjectBlock:^NSString *_Nullable(YapDatabaseReadTransaction *_Nonnull transaction, + NSString *_Nonnull collection, + NSString *_Nonnull key, + id _Nonnull object) { + if (![object isKindOfClass:[OWSMessageProcessingJob class]]) { + OWSFail(@"Unexpected object: %@ in collection: %@", object, collection) return nil; + } + + // Arbitrary string - all in the same group. We're only using the view for sorting. + return OWSMessageProcessingJobFinderExtensionGroup; + }]; + + YapDatabaseViewOptions *options = [YapDatabaseViewOptions new]; + options.allowedCollections = + [[YapWhitelistBlacklist alloc] initWithWhitelist:[NSSet setWithObject:[OWSMessageProcessingJob collection]]]; + + return [[YapDatabaseView alloc] initWithGrouping:grouping sorting:sorting versionTag:@"1" options:options]; +} + + ++ (void)syncRegisterDatabaseExtension:(YapDatabase *)database +{ + YapDatabaseView *existingView = [database registeredExtension:OWSMessageProcessingJobFinderExtensionName]; + if (existingView) { + // already initialized + return; + } + [database registerExtension:[self databaseExension] withName:OWSMessageProcessingJobFinderExtensionName]; +} + +@end + +#pragma mark - Queue Processing + +@interface OWSMessageProcessingQueue : NSObject + +@property (nonatomic, readonly) TSMessagesManager *messagesManager; +@property (nonatomic, readonly) OWSMessageProcessingJobFinder *finder; + +- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + finder:(OWSMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER; +- (instancetype)init NS_UNAVAILABLE; + +@end + +@implementation OWSMessageProcessingQueue + +- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + finder:(OWSMessageProcessingJobFinder *)finder +{ + OWSSingletonAssert(); + + self = [super init]; + if (!self) { + return self; + } + + _messagesManager = messagesManager; + _finder = finder; + + return self; +} + +#pragma mark - instance methods + +- (void)enqueueEnvelopeForProcessing:(OWSSignalServiceProtosEnvelope *)envelope +{ + [self.finder addJobForEnvelope:envelope]; +} + +- (void)drainQueue +{ + dispatch_async(self.class.serialGCDQueue, ^{ + OWSMessageProcessingJob *_Nullable job = [self.finder nextJob]; + if (job == nil) { + DDLogVerbose(@"%@ Queue is drained", self.tag); + return; + } + + [self processJob:job + completion:^{ + [self drainQueue]; + }]; + }); +} + +- (void)processJob:(OWSMessageProcessingJob *)job completion:(void (^)())completion +{ + dispatch_async(dispatch_get_main_queue(), ^{ + [self.messagesManager processEnvelope:job.envelopeProto + completion:^{ + [self.finder removeJobWithId:job.uniqueId]; + completion(); + }]; + }); +} + +#pragma mark Helpers + ++ (dispatch_queue_t)serialGCDQueue +{ + static dispatch_once_t onceToken; + static dispatch_queue_t queue; + dispatch_once(&onceToken, ^{ + queue = dispatch_queue_create("org.whispersystems.signal.messageProcessingQueue", NULL); + }); + return queue; +} + +#pragma mark Logging + ++ (NSString *)tag +{ + return [NSString stringWithFormat:@"[%@]", self.class]; +} + +- (NSString *)tag +{ + return self.class.tag; +} + +@end + +#pragma mark - OWSMessageReceiver + +@interface OWSMessageReceiver () + +@property (nonatomic, readonly) OWSMessageProcessingQueue *processingQueue; +@property (nonatomic, readonly) YapDatabaseConnection *dbConnection; + +@end + +@implementation OWSMessageReceiver + +- (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection + messagesManager:(TSMessagesManager *)messagesManager +{ + OWSSingletonAssert(); + + self = [super init]; + if (!self) { + return self; + } + + OWSMessageProcessingJobFinder *finder = [[OWSMessageProcessingJobFinder alloc] initWithDBConnection:dbConnection]; + OWSMessageProcessingQueue *processingQueue = + [[OWSMessageProcessingQueue alloc] initWithMessagesManager:messagesManager finder:finder]; + + _processingQueue = processingQueue; + + return self; +} + +- (instancetype)initDefault +{ + // For concurrency coherency we use the same dbConnection to persist and read the unprocessed envelopes + YapDatabaseConnection *dbConnection = [[TSStorageManager sharedManager].database newConnection]; + TSMessagesManager *messagesManager = [TSMessagesManager sharedManager]; + + return [self initWithDBConnection:dbConnection messagesManager:messagesManager]; +} + ++ (instancetype)sharedInstance +{ + static OWSMessageReceiver *sharedInstance; + + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + sharedInstance = [[self alloc] initDefault]; + }); + + return sharedInstance; +} + +#pragma mark - class methods + ++ (void)syncRegisterDatabaseExtension:(YapDatabase *)database +{ + [OWSMessageProcessingJobFinder syncRegisterDatabaseExtension:database]; +} + +#pragma mark - instance methods + +- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope +{ + [self.processingQueue enqueueEnvelopeForProcessing:envelope]; + [self.processingQueue drainQueue]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/TSMessagesManager.h b/SignalServiceKit/src/Messages/TSMessagesManager.h index 0a0951841..e45e73737 100644 --- a/SignalServiceKit/src/Messages/TSMessagesManager.h +++ b/SignalServiceKit/src/Messages/TSMessagesManager.h @@ -28,15 +28,8 @@ typedef void (^MessageManagerCompletionBlock)(); @property (nonatomic, readonly) TSNetworkManager *networkManager; @property (nonatomic, readonly) ContactsUpdater *contactsUpdater; -- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope - completion:(nullable MessageManagerCompletionBlock)completion; - -/** - * @returns - * Group or Contact thread for message, creating a new one if necessary. - */ -- (TSThread *)threadForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope - dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage; +- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope + completion:(nullable MessageManagerCompletionBlock)completion; - (NSUInteger)unreadMessagesCount; - (NSUInteger)unreadMessagesCountExcept:(TSThread *)thread; diff --git a/SignalServiceKit/src/Messages/TSMessagesManager.m b/SignalServiceKit/src/Messages/TSMessagesManager.m index a5b6c8a76..ea6161d8c 100644 --- a/SignalServiceKit/src/Messages/TSMessagesManager.m +++ b/SignalServiceKit/src/Messages/TSMessagesManager.m @@ -290,8 +290,8 @@ NS_ASSUME_NONNULL_BEGIN #pragma mark - message handling -- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope - completion:(nullable MessageManagerCompletionBlock)completionHandler +- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope + completion:(nullable MessageManagerCompletionBlock)completionHandler { OWSAssert([NSThread isMainThread]); @@ -1098,6 +1098,10 @@ NS_ASSUME_NONNULL_BEGIN && dataMessage.group.hasAvatar; } +/** + * @returns + * Group or Contact thread for message, creating a new one if necessary. + */ - (TSThread *)threadForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage { diff --git a/SignalServiceKit/src/Network/WebSockets/TSSocketManager.m b/SignalServiceKit/src/Network/WebSockets/TSSocketManager.m index fae1b53cc..0c2631e0a 100644 --- a/SignalServiceKit/src/Network/WebSockets/TSSocketManager.m +++ b/SignalServiceKit/src/Network/WebSockets/TSSocketManager.m @@ -5,6 +5,7 @@ #import "TSSocketManager.h" #import "Cryptography.h" #import "NSTimer+OWS.h" +#import "OWSMessageReceiver.h" #import "OWSSignalService.h" #import "OWSWebsocketSecurityPolicy.h" #import "SubProtocol.pb.h" @@ -31,6 +32,7 @@ NSString *const kNSNotification_SocketManagerStateDidChange = @"kNSNotification_ @interface TSSocketManager () @property (nonatomic, readonly) OWSSignalService *signalService; +@property (nonatomic, readonly) OWSMessageReceiver *messageReceiver; // This class has a few "tiers" of state. // @@ -94,6 +96,7 @@ NSString *const kNSNotification_SocketManagerStateDidChange = @"kNSNotification_ OWSAssert([NSThread isMainThread]); _signalService = [OWSSignalService sharedInstance]; + _messageReceiver = [OWSMessageReceiver sharedInstance]; _state = SocketManagerStateClosed; _fetchingTaskIdentifier = UIBackgroundTaskInvalid; @@ -387,12 +390,9 @@ NSString *const kNSNotification_SocketManagerStateDidChange = @"kNSNotification_ OWSSignalServiceProtosEnvelope *envelope = [OWSSignalServiceProtosEnvelope parseFromData:decryptedPayload]; - [[TSMessagesManager sharedManager] handleReceivedEnvelope:envelope - completion:^{ - // Don't acknowledge delivery until the envelope has been - // processed. - [self sendWebSocketMessageAcknowledgement:message]; - }]; + [self.messageReceiver handleReceivedEnvelope:envelope]; + [self sendWebSocketMessageAcknowledgement:message]; + } else { DDLogWarn(@"Unsupported WebSocket Request"); diff --git a/SignalServiceKit/src/Storage/TSStorageManager.m b/SignalServiceKit/src/Storage/TSStorageManager.m index fdc24a78f..8cc7e89ac 100644 --- a/SignalServiceKit/src/Storage/TSStorageManager.m +++ b/SignalServiceKit/src/Storage/TSStorageManager.m @@ -18,6 +18,7 @@ #import "TSThread.h" #import <25519/Randomness.h> #import +#import #import NS_ASSUME_NONNULL_BEGIN @@ -206,6 +207,7 @@ static NSString *keychainDBPassAccount = @"TSDatabasePass"; [TSDatabaseView registerThreadInteractionsDatabaseView]; [TSDatabaseView registerUnreadDatabaseView]; [self.database registerExtension:[TSDatabaseSecondaryIndexes registerTimeStampIndex] withName:@"idx"]; + [OWSMessageReceiver syncRegisterDatabaseExtension:self.database]; // Run the blocking migrations. //