diff --git a/Signal.xcodeproj/project.pbxproj b/Signal.xcodeproj/project.pbxproj index 3733b6e78..ee360727c 100644 --- a/Signal.xcodeproj/project.pbxproj +++ b/Signal.xcodeproj/project.pbxproj @@ -444,6 +444,7 @@ 4CC0B59C20EC5F2E00CF6EE0 /* ConversationConfigurationSyncOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4CC0B59B20EC5F2E00CF6EE0 /* ConversationConfigurationSyncOperation.swift */; }; 4CC1ECF9211A47CE00CC13BE /* StoreKit.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 4CC1ECF8211A47CD00CC13BE /* StoreKit.framework */; settings = {ATTRIBUTES = (Weak, ); }; }; 4CC1ECFB211A553000CC13BE /* AppUpdateNag.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4CC1ECFA211A553000CC13BE /* AppUpdateNag.swift */; }; + 4CEB78C92178EBAB00F315D2 /* OWSSessionResetJobRecord.m in Sources */ = {isa = PBXBuildFile; fileRef = 4CEB78C82178EBAB00F315D2 /* OWSSessionResetJobRecord.m */; }; 70377AAB1918450100CAF501 /* MobileCoreServices.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 70377AAA1918450100CAF501 /* MobileCoreServices.framework */; }; 768A1A2B17FC9CD300E00ED8 /* libz.dylib in Frameworks */ = {isa = PBXBuildFile; fileRef = 768A1A2A17FC9CD300E00ED8 /* libz.dylib */; }; 76C87F19181EFCE600C4ACAB /* MediaPlayer.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 76C87F18181EFCE600C4ACAB /* MediaPlayer.framework */; }; @@ -1129,6 +1130,8 @@ 4CC0B59B20EC5F2E00CF6EE0 /* ConversationConfigurationSyncOperation.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConversationConfigurationSyncOperation.swift; sourceTree = ""; }; 4CC1ECF8211A47CD00CC13BE /* StoreKit.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = StoreKit.framework; path = System/Library/Frameworks/StoreKit.framework; sourceTree = SDKROOT; }; 4CC1ECFA211A553000CC13BE /* AppUpdateNag.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppUpdateNag.swift; sourceTree = ""; }; + 4CEB78C72178EBAB00F315D2 /* OWSSessionResetJobRecord.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = OWSSessionResetJobRecord.h; sourceTree = ""; }; + 4CEB78C82178EBAB00F315D2 /* OWSSessionResetJobRecord.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = OWSSessionResetJobRecord.m; sourceTree = ""; }; 4CFF4C0920F55BBA005DA313 /* MenuActionsViewController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MenuActionsViewController.swift; sourceTree = ""; }; 69349DE607F5BA6036C9AC60 /* Pods-SignalShareExtension.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-SignalShareExtension.debug.xcconfig"; path = "Pods/Target Support Files/Pods-SignalShareExtension/Pods-SignalShareExtension.debug.xcconfig"; sourceTree = ""; }; 70377AAA1918450100CAF501 /* MobileCoreServices.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = MobileCoreServices.framework; path = System/Library/Frameworks/MobileCoreServices.framework; sourceTree = SDKROOT; }; @@ -2081,6 +2084,8 @@ 45CD81EE1DC030E7004C9430 /* SyncPushTokensJob.swift */, 452ECA4C1E087E7200E2F016 /* MessageFetcherJob.swift */, 4CC0B59B20EC5F2E00CF6EE0 /* ConversationConfigurationSyncOperation.swift */, + 4CEB78C72178EBAB00F315D2 /* OWSSessionResetJobRecord.h */, + 4CEB78C82178EBAB00F315D2 /* OWSSessionResetJobRecord.m */, ); path = Jobs; sourceTree = ""; @@ -3287,6 +3292,7 @@ EF764C351DB67CC5000D9A87 /* UIViewController+Permissions.m in Sources */, 45CD81EF1DC030E7004C9430 /* SyncPushTokensJob.swift in Sources */, 34D2CCE0206939B400CB1A14 /* DebugUIMessagesAssetLoader.m in Sources */, + 4CEB78C92178EBAB00F315D2 /* OWSSessionResetJobRecord.m in Sources */, 45794E861E00620000066731 /* CallUIAdapter.swift in Sources */, 340FC8BA204DAC8D007AEB0F /* FingerprintViewScanController.m in Sources */, 4585C4681ED8F8D200896AEA /* SafetyNumberConfirmationAlert.swift in Sources */, diff --git a/Signal/src/AppDelegate.m b/Signal/src/AppDelegate.m index 35ea2be66..899f00896 100644 --- a/Signal/src/AppDelegate.m +++ b/Signal/src/AppDelegate.m @@ -1108,6 +1108,11 @@ static NSTimeInterval launchStartedAt; [SSKEnvironment.shared.messageReceiver handleAnyUnprocessedEnvelopesAsync]; [SSKEnvironment.shared.batchMessageProcessor handleAnyUnprocessedEnvelopesAsync]; + // TODO + // - incorporate reachability check + [SSKEnvironment.shared.messageSenderJobQueue setup]; + [AppEnvironment.shared.sessionResetJobQueue setup]; + if (!Environment.shared.preferences.hasGeneratedThumbnails) { [self.primaryStorage.newDatabaseConnection asyncReadWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { diff --git a/Signal/src/Jobs/ConversationConfigurationSyncOperation.swift b/Signal/src/Jobs/ConversationConfigurationSyncOperation.swift index 90ad18a31..b5c50e227 100644 --- a/Signal/src/Jobs/ConversationConfigurationSyncOperation.swift +++ b/Signal/src/Jobs/ConversationConfigurationSyncOperation.swift @@ -15,8 +15,8 @@ class ConversationConfigurationSyncOperation: OWSOperation { return OWSPrimaryStorage.shared().dbReadConnection } - private var messageSender: MessageSender { - return SSKEnvironment.shared.messageSender + private var messageSenderJobQueue: MessageSenderJobQueue { + return SSKEnvironment.shared.messageSenderJobQueue } private var contactsManager: OWSContactsManager { @@ -84,15 +84,12 @@ class ConversationConfigurationSyncOperation: OWSOperation { } private func sendConfiguration(attachmentDataSource: DataSource, syncMessage: OWSOutgoingSyncMessage) { - self.messageSender.enqueueTemporaryAttachment(attachmentDataSource, - contentType: OWSMimeTypeApplicationOctetStream, - in: syncMessage, - success: { - self.reportSuccess() - }, - failure: { error in - self.reportError(error) - }) + self.messageSenderJobQueue.add(mediaMessage: syncMessage, + dataSource: attachmentDataSource, + contentType: OWSMimeTypeApplicationOctetStream, + sourceFilename: nil, + isTemporaryAttachment: true) + self.reportSuccess() } } diff --git a/Signal/src/Jobs/OWSSessionResetJobRecord.h b/Signal/src/Jobs/OWSSessionResetJobRecord.h new file mode 100644 index 000000000..642200fc6 --- /dev/null +++ b/Signal/src/Jobs/OWSSessionResetJobRecord.h @@ -0,0 +1,23 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +#import + +@class TSContactThread; + +NS_ASSUME_NONNULL_BEGIN + +@interface OWSSessionResetJobRecord : SSKJobRecord + +@property (nonatomic, readonly) NSString *contactThreadId; + +- (instancetype)initWithContactThread:(TSContactThread *)contactThread + label:(NSString *)label NS_DESIGNATED_INITIALIZER; +- (nullable instancetype)initWithCoder:(NSCoder *)coder NS_DESIGNATED_INITIALIZER; + +- (nullable)initWithLabel:(NSString *)label NS_UNAVAILABLE; + +@end + +NS_ASSUME_NONNULL_END diff --git a/Signal/src/Jobs/OWSSessionResetJobRecord.m b/Signal/src/Jobs/OWSSessionResetJobRecord.m new file mode 100644 index 000000000..e051a3b8d --- /dev/null +++ b/Signal/src/Jobs/OWSSessionResetJobRecord.m @@ -0,0 +1,31 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +#import "OWSSessionResetJobRecord.h" +#import + +NS_ASSUME_NONNULL_BEGIN + +@implementation OWSSessionResetJobRecord + +- (instancetype)initWithContactThread:(TSContactThread *)contactThread label:(NSString *)label +{ + self = [super initWithLabel:label]; + if (!self) { + return self; + } + + _contactThreadId = contactThread.uniqueId; + + return self; +} + +- (nullable instancetype)initWithCoder:(NSCoder *)coder +{ + return [super initWithCoder:coder]; +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/Signal/src/Jobs/SessionResetJob.swift b/Signal/src/Jobs/SessionResetJob.swift index 3d560c801..858c53ff1 100644 --- a/Signal/src/Jobs/SessionResetJob.swift +++ b/Signal/src/Jobs/SessionResetJob.swift @@ -6,67 +6,171 @@ import Foundation import PromiseKit import SignalServiceKit -@objc(OWSSessionResetJob) -public class SessionResetJob: NSObject { +@objc(OWSSessionResetJobQueue) +public class SessionResetJobQueue: NSObject, JobQueue { - let recipientId: String - let thread: TSThread - let primaryStorage: OWSPrimaryStorage - let messageSender: MessageSender - - @objc public required init(recipientId: String, thread: TSThread, messageSender: MessageSender, primaryStorage: OWSPrimaryStorage) { - self.thread = thread - self.recipientId = recipientId - self.messageSender = messageSender - self.primaryStorage = primaryStorage + @objc(addContactThread:transaction:) + public func add(contactThread: TSContactThread, transaction: YapDatabaseReadWriteTransaction) { + let jobRecord = OWSSessionResetJobRecord(contactThread: contactThread, label: self.jobRecordLabel) + self.add(jobRecord: jobRecord, transaction: transaction) } - func run() { - Logger.info("Local user reset session.") + // MARK: JobQueue - let dbConnection = OWSPrimaryStorage.shared().newDatabaseConnection() - dbConnection.asyncReadWrite { (transaction) in - Logger.info("deleting sessions for recipient: \(self.recipientId)") - self.primaryStorage.deleteAllSessions(forContact: self.recipientId, protocolContext: transaction) + public typealias DurableOperationType = SessionResetOperation + public let jobRecordLabel: String = "SessionReset" + public static let maxRetries: UInt = 10 - DispatchQueue.main.async { - let endSessionMessage = EndSessionMessage(timestamp: NSDate.ows_millisecondTimeStamp(), in: self.thread) + @objc + public func setup() { + defaultSetup() + } - self.messageSender.enqueue(endSessionMessage, success: { - dbConnection.asyncReadWrite { (transaction) in - // Archive the just-created session since the recipient should delete their corresponding - // session upon receiving and decrypting our EndSession message. - // Otherwise if we send another message before them, they wont have the session to decrypt it. - self.primaryStorage.archiveAllSessions(forContact: self.recipientId, protocolContext: transaction) - } - Logger.info("successfully sent EndSessionMessage.") - let message = TSInfoMessage(timestamp: NSDate.ows_millisecondTimeStamp(), - in: self.thread, - messageType: TSInfoMessageType.typeSessionDidEnd) - message.save() - }, failure: {error in - dbConnection.asyncReadWrite { (transaction) in - // Even though this is the error handler - which means probably the recipient didn't receive the message - // there's a chance that our send did succeed and the server just timed out our repsonse or something. - // Since the cost of sending a future message using a session the recipient doesn't have is so high, - // we archive the session just in case. - // - // Archive the just-created session since the recipient should delete their corresponding - // session upon receiving and decrypting our EndSession message. - // Otherwise if we send another message before them, they wont have the session to decrypt it. - self.primaryStorage.archiveAllSessions(forContact: self.recipientId, protocolContext: transaction) - } - Logger.error("failed to send EndSessionMessage with error: \(error.localizedDescription)") - }) - } + public var isReady: Bool = false { + didSet { + if isReady { + DispatchQueue.global().async { + self.workStep() + } } } + } - @objc public class func run(contactThread: TSContactThread, messageSender: MessageSender, primaryStorage: OWSPrimaryStorage) { - let job = self.init(recipientId: contactThread.contactIdentifier(), - thread: contactThread, - messageSender: messageSender, - primaryStorage: primaryStorage) - job.run() + public func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) { + // no special handling + } + + let operationQueue: OperationQueue = { + // no need to serialize the operation queuing, since sending will ultimately be serialized by MessageSender + let operationQueue = OperationQueue() + operationQueue.name = "SessionReset.OperationQueue" + return operationQueue + }() + + public func operationQueue(jobRecord: OWSSessionResetJobRecord) -> OperationQueue { + return self.operationQueue + } + + public func buildOperation(jobRecord: OWSSessionResetJobRecord, transaction: YapDatabaseReadTransaction) throws -> SessionResetOperation { + guard let contactThread = TSThread.fetch(uniqueId: jobRecord.contactThreadId, transaction: transaction) as? TSContactThread else { + throw JobError.obsolete(description: "thread for session reset no longer exists") + } + + return SessionResetOperation(contactThread: contactThread, jobRecord: jobRecord) + } +} + +@objc(OWSSessionResetJob) +public class SessionResetOperation: OWSOperation, DurableOperation { + + // MARK: DurableOperation + + public let jobRecord: OWSSessionResetJobRecord + + weak public var durableOperationDelegate: SessionResetJobQueue? + + public var operation: Operation { + return self + } + + // MARK: + + let contactThread: TSContactThread + var recipientId: String { + return contactThread.contactIdentifier() + } + + @objc public required init(contactThread: TSContactThread, jobRecord: OWSSessionResetJobRecord) { + self.contactThread = contactThread + self.jobRecord = jobRecord + } + + // MARK: Dependencies + + var dbConnection: YapDatabaseConnection { + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection + } + + var primaryStorage: OWSPrimaryStorage { + return SSKEnvironment.shared.primaryStorage + } + + var messageSender: MessageSender { + return SSKEnvironment.shared.messageSender + } + + // MARK: + + override public func run() { + assert(self.durableOperationDelegate != nil) + + self.dbConnection.readWrite { transaction in + Logger.info("deleting sessions for recipient: \(self.recipientId)") + self.primaryStorage.deleteAllSessions(forContact: self.recipientId, protocolContext: transaction) + } + + let endSessionMessage = EndSessionMessage(timestamp: NSDate.ows_millisecondTimeStamp(), in: self.contactThread) + + firstly { + return self.messageSender.sendPromise(message: endSessionMessage) + }.done { + Logger.info("successfully sent EndSessionMessage.") + self.dbConnection.readWrite { transaction in + // Archive the just-created session since the recipient should delete their corresponding + // session upon receiving and decrypting our EndSession message. + // Otherwise if we send another message before them, they wont have the session to decrypt it. + self.primaryStorage.archiveAllSessions(forContact: self.recipientId, protocolContext: transaction) + + let message = TSInfoMessage(timestamp: NSDate.ows_millisecondTimeStamp(), + in: self.contactThread, + messageType: TSInfoMessageType.typeSessionDidEnd) + message.save(with: transaction) + } + self.reportSuccess() + }.catch { error in + Logger.error("sending error: \(error.localizedDescription)") + self.reportError(error) + }.retainUntilComplete() + } + + override public func didSucceed() { + self.dbConnection.readWrite { transaction in + self.durableOperationDelegate?.durableOperationDidSucceed(self, transaction: transaction) + } + } + + override public func didReportError(_ error: Error) { + Logger.debug("remainingRetries: \(self.remainingRetries)") + + self.dbConnection.readWrite { transaction in + self.durableOperationDelegate?.durableOperation(self, didReportError: error, transaction: transaction) + } + } + + override public func retryDelay() -> dispatch_time_t { + // Arbitrary backoff factor... + // 10 failures, wait ~1min + let backoffFactor = 1.9 + let maxBackoff = kHourInterval + + let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount))) + return UInt64(seconds) * NSEC_PER_SEC + } + + override public func didFail(error: Error) { + Logger.error("failed to send EndSessionMessage with error: \(error.localizedDescription)") + self.dbConnection.readWrite { transaction in + self.durableOperationDelegate?.durableOperation(self, didFailWithError: error, transaction: transaction) + + // Even though this is the failure handler - which means probably the recipient didn't receive the message + // there's a chance that our send did succeed and the server just timed out our repsonse or something. + // Since the cost of sending a future message using a session the recipient doesn't have is so high, + // we archive the session just in case. + // + // Archive the just-created session since the recipient should delete their corresponding + // session upon receiving and decrypting our EndSession message. + // Otherwise if we send another message before them, they wont have the session to decrypt it. + self.primaryStorage.archiveAllSessions(forContact: self.recipientId, protocolContext: transaction) + } } } diff --git a/Signal/src/MessageSender+Promise.swift b/Signal/src/MessageSender+Promise.swift index 5217f3b34..e6f078918 100644 --- a/Signal/src/MessageSender+Promise.swift +++ b/Signal/src/MessageSender+Promise.swift @@ -13,7 +13,7 @@ public extension MessageSender { */ public func sendPromise(message: TSOutgoingMessage) -> Promise { let promise: Promise = Promise { resolver in - self.enqueue(message, success: resolver.fulfill, failure: resolver.reject) + self.send(message, success: resolver.fulfill, failure: resolver.reject) } // Ensure sends complete before they're GC'd. diff --git a/Signal/src/Signal-Bridging-Header.h b/Signal/src/Signal-Bridging-Header.h index 3c01da122..c63712043 100644 --- a/Signal/src/Signal-Bridging-Header.h +++ b/Signal/src/Signal-Bridging-Header.h @@ -35,6 +35,7 @@ #import "OWSNavigationController.h" #import "OWSProgressView.h" #import "OWSQuotedMessageView.h" +#import "OWSSessionResetJobRecord.h" #import "OWSWindowManager.h" #import "PinEntryView.h" #import "PrivacySettingsTableViewController.h" diff --git a/Signal/src/ViewControllers/ConversationView/ConversationViewController.m b/Signal/src/ViewControllers/ConversationView/ConversationViewController.m index e863cdd5c..69be10fff 100644 --- a/Signal/src/ViewControllers/ConversationView/ConversationViewController.m +++ b/Signal/src/ViewControllers/ConversationView/ConversationViewController.m @@ -289,6 +289,20 @@ typedef enum : NSUInteger { _voiceNoteAudioActivity = [[AudioActivity alloc] initWithAudioDescription:audioActivityDescription]; } +#pragma mark - Dependencies + +- (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return SSKEnvironment.shared.messageSenderJobQueue; +} + +- (OWSSessionResetJobQueue *)sessionResetJobQueue +{ + return AppEnvironment.shared.sessionResetJobQueue; +} + +#pragma mark + - (void)addNotificationListeners { [[NSNotificationCenter defaultCenter] addObserver:self @@ -1790,18 +1804,15 @@ typedef enum : NSUInteger { }]; [actionSheetController addAction:deleteMessageAction]; - UIAlertAction *resendMessageAction = - [UIAlertAction actionWithTitle:NSLocalizedString(@"SEND_AGAIN_BUTTON", @"") - style:UIAlertActionStyleDefault - handler:^(UIAlertAction *action) { - [self.messageSender enqueueMessage:message - success:^{ - OWSLogInfo(@"Successfully resent failed message."); - } - failure:^(NSError *error) { - OWSLogWarn(@"Failed to send message with error: %@", error); - }]; - }]; + UIAlertAction *resendMessageAction = [UIAlertAction + actionWithTitle:NSLocalizedString(@"SEND_AGAIN_BUTTON", @"") + style:UIAlertActionStyleDefault + handler:^(UIAlertAction *action) { + [self.editingDatabaseConnection + asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; + }]; [actionSheetController addAction:resendMessageAction]; @@ -1850,9 +1861,10 @@ typedef enum : NSUInteger { return; } TSContactThread *contactThread = (TSContactThread *)self.thread; - [OWSSessionResetJob runWithContactThread:contactThread - messageSender:self.messageSender - primaryStorage:self.primaryStorage]; + [self.editingDatabaseConnection + asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.sessionResetJobQueue addContactThread:contactThread transaction:transaction]; + }]; }]; [alertController addAction:resetSessionAction]; @@ -3099,11 +3111,9 @@ typedef enum : NSUInteger { [attachment mimeType]); BOOL didAddToProfileWhitelist = [ThreadUtil addThreadToProfileWhitelistIfEmptyContactThread:self.thread]; - TSOutgoingMessage *message = [ThreadUtil sendMessageWithAttachment:attachment - inThread:self.thread - quotedReplyModel:self.inputToolbar.quotedReply - messageSender:self.messageSender - completion:nil]; + TSOutgoingMessage *message = [ThreadUtil enqueueMessageWithAttachment:attachment + inThread:self.thread + quotedReplyModel:self.inputToolbar.quotedReply]; [self messageWasSent:message]; @@ -3121,16 +3131,17 @@ typedef enum : NSUInteger { BOOL didAddToProfileWhitelist = [ThreadUtil addThreadToProfileWhitelistIfEmptyContactThread:self.thread]; - [self.editingDatabaseConnection asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - if (contactShare.avatarImage) { - [contactShare.dbRecord saveAvatarImage:contactShare.avatarImage transaction:transaction]; + [self.editingDatabaseConnection + asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + // TODO - in line with QuotedReply and other message attachments, saving should happen as part of sending + // preparation rather than duplicated here and in the SAE + if (contactShare.avatarImage) { + [contactShare.dbRecord saveAvatarImage:contactShare.avatarImage transaction:transaction]; + } } - } completionBlock:^{ - TSOutgoingMessage *message = [ThreadUtil sendMessageWithContactShare:contactShare.dbRecord - inThread:self.thread - messageSender:self.messageSender - completion:nil]; + TSOutgoingMessage *message = + [ThreadUtil enqueueMessageWithContactShare:contactShare.dbRecord inThread:self.thread]; [self messageWasSent:message]; if (didAddToProfileWhitelist) { @@ -3956,7 +3967,10 @@ typedef enum : NSUInteger { if (newGroupModel.groupImage) { NSData *data = UIImagePNGRepresentation(newGroupModel.groupImage); DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithData:data fileExtension:@"png"]; - [self.messageSender enqueueTemporaryAttachment:dataSource + // DURABLE CLEANUP - currently one caller uses the completion handler to delete the tappable error message + // which causes this code to be called. Once we're more aggressive about durable sending retry, + // we could get rid of this "retryable tappable error message". + [self.messageSender sendTemporaryAttachment:dataSource contentType:OWSMimeTypeImagePng inMessage:message success:^{ @@ -3969,7 +3983,10 @@ typedef enum : NSUInteger { OWSLogError(@"Failed to send group avatar update with error: %@", error); }]; } else { - [self.messageSender enqueueMessage:message + // DURABLE CLEANUP - currently one caller uses the completion handler to delete the tappable error message + // which causes this code to be called. Once we're more aggressive about durable sending retry, + // we could get rid of this "retryable tappable error message". + [self.messageSender sendMessage:message success:^{ OWSLogDebug(@"Successfully sent group update"); if (successCompletion) { @@ -4442,7 +4459,7 @@ typedef enum : NSUInteger { // We convert large text messages to attachments // which are presented as normal text messages. BOOL didAddToProfileWhitelist = [ThreadUtil addThreadToProfileWhitelistIfEmptyContactThread:self.thread]; - TSOutgoingMessage *message; + __block TSOutgoingMessage *message; if ([text lengthOfBytesUsingEncoding:NSUTF8StringEncoding] >= kOversizeTextMessageSizeThreshold) { DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithOversizeText:text]; @@ -4451,16 +4468,16 @@ typedef enum : NSUInteger { // TODO we should redundantly send the first n chars in the body field so it can be viewed // on clients that don't support oversized text messgaes, (and potentially generate a preview // before the attachment is downloaded) - message = [ThreadUtil sendMessageWithAttachment:attachment - inThread:self.thread - quotedReplyModel:self.inputToolbar.quotedReply - messageSender:self.messageSender - completion:nil]; + message = [ThreadUtil enqueueMessageWithAttachment:attachment + inThread:self.thread + quotedReplyModel:self.inputToolbar.quotedReply]; } else { - message = [ThreadUtil sendMessageWithText:text - inThread:self.thread - quotedReplyModel:self.inputToolbar.quotedReply - messageSender:self.messageSender]; + [self.editingDatabaseConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + message = [ThreadUtil enqueueMessageWithText:text + inThread:self.thread + quotedReplyModel:self.inputToolbar.quotedReply + transaction:transaction]; + }]; } [self messageWasSent:message]; diff --git a/Signal/src/ViewControllers/DebugUI/DebugUIMessages.m b/Signal/src/ViewControllers/DebugUI/DebugUIMessages.m index 9c5f1e91d..ebab8c895 100644 --- a/Signal/src/ViewControllers/DebugUI/DebugUIMessages.m +++ b/Signal/src/ViewControllers/DebugUI/DebugUIMessages.m @@ -243,13 +243,10 @@ NS_ASSUME_NONNULL_BEGIN OWSSyncGroupsRequestMessage *syncGroupsRequestMessage = [[OWSSyncGroupsRequestMessage alloc] initWithThread:thread groupId:[Randomness generateRandomBytes:kGroupIdLength]]; - [SSKEnvironment.shared.messageSender enqueueMessage:syncGroupsRequestMessage - success:^{ - OWSLogWarn(@"Successfully sent Request Group Info message."); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send Request Group Info message with error: %@", error); - }]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:syncGroupsRequestMessage transaction:transaction]; + }]; + }], [OWSTableItem itemWithTitle:@"Message with stalled timer" actionBlock:^{ @@ -312,6 +309,28 @@ NS_ASSUME_NONNULL_BEGIN #ifdef DEBUG +#pragma mark - Dependencies + +- (YapDatabaseConnection *)dbConnection +{ + return self.class.dbConnection; +} + ++ (YapDatabaseConnection *)dbConnection +{ + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection; +} + ++ (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return SSKEnvironment.shared.messageSenderJobQueue; +} + +- (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return self.class.messageSenderJobQueue; +} + + (void)sendMessages:(NSUInteger)count toAllMembersOfGroup:(TSGroupThread *)groupThread { for (NSString *recipientId in groupThread.groupModel.groupMemberIds) { @@ -327,9 +346,10 @@ NS_ASSUME_NONNULL_BEGIN NSString *randomText = [self randomText]; NSString *text = [[[@(counter) description] stringByAppendingString:@" "] stringByAppendingString:randomText]; - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; - TSOutgoingMessage *message = - [ThreadUtil sendMessageWithText:text inThread:thread quotedReplyModel:nil messageSender:messageSender]; + __block TSOutgoingMessage *message; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + message = [ThreadUtil enqueueMessageWithText:text inThread:thread quotedReplyModel:nil transaction:transaction]; + }]; OWSLogError(@"sendTextMessageInThread timestamp: %llu.", message.timestamp); } @@ -365,7 +385,6 @@ NS_ASSUME_NONNULL_BEGIN OWSAssertDebug(filePath); OWSAssertDebug(thread); - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; NSString *filename = [filePath lastPathComponent]; NSString *utiType = [MIMETypeUtil utiTypeForFileExtension:filename.pathExtension]; DataSource *_Nullable dataSource = [DataSourcePath dataSourceWithFilePath:filePath shouldDeleteOnDeallocation:NO]; @@ -391,11 +410,7 @@ NS_ASSUME_NONNULL_BEGIN [DDLog flushLog]; } OWSAssertDebug(![attachment hasError]); - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil]; success(); } @@ -1710,12 +1725,7 @@ NS_ASSUME_NONNULL_BEGIN OWSAssertDebug(thread); SignalAttachment *attachment = [self signalAttachmentForFilePath:filePath]; - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil]; success(); } @@ -3119,8 +3129,7 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac ActionFailureBlock failure) { OWSContact *contact = contactBlock(transaction); OWSLogVerbose(@"sending contact: %@", contact.debugDescription); - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; - [ThreadUtil sendMessageWithContactShare:contact inThread:thread messageSender:messageSender completion:nil]; + [ThreadUtil enqueueMessageWithContactShare:contact inThread:thread]; success(); }]; @@ -3311,16 +3320,11 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac + (void)sendOversizeTextMessage:(TSThread *)thread { - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; NSString *message = [self randomOversizeText]; DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithOversizeText:message]; SignalAttachment *attachment = [SignalAttachment attachmentWithDataSource:dataSource dataUTI:kOversizeTextAttachmentUTI]; - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil]; } + (NSData *)createRandomNSDataOfSize:(size_t)size @@ -3343,7 +3347,6 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac + (void)sendRandomAttachment:(TSThread *)thread uti:(NSString *)uti length:(NSUInteger)length { - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithData:[self createRandomNSDataOfSize:length] utiType:uti]; SignalAttachment *attachment = @@ -3354,12 +3357,7 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac // style them indistinguishably from a separate text message. attachment.captionText = [self randomCaptionText]; } - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - ignoreErrors:YES - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil ignoreErrors:YES]; } + (SSKProtoEnvelope *)createEnvelopeForThread:(TSThread *)thread @@ -3822,33 +3820,29 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac [[TSGroupModel alloc] initWithTitle:groupName memberIds:recipientIds image:nil groupId:groupId]; __block TSGroupThread *thread; - [OWSPrimaryStorage.dbReadWriteConnection - readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { - thread = [TSGroupThread getOrCreateThreadWithGroupModel:groupModel transaction:transaction]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + thread = [TSGroupThread getOrCreateThreadWithGroupModel:groupModel transaction:transaction]; + OWSAssertDebug(thread); + + TSOutgoingMessage *message = [TSOutgoingMessage outgoingMessageInThread:thread + groupMetaMessage:TSGroupMetaMessageNew + expiresInSeconds:0]; + [message updateWithCustomMessage:NSLocalizedString(@"GROUP_CREATED", nil) transaction:transaction]; + + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; + + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)1.f * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [ThreadUtil enqueueMessageWithText:[@(counter) description] + inThread:thread + quotedReplyModel:nil + transaction:transaction]; }]; - OWSAssertDebug(thread); - - TSOutgoingMessage *message = - [TSOutgoingMessage outgoingMessageInThread:thread groupMetaMessage:TSGroupMetaMessageNew expiresInSeconds:0]; - [message updateWithCustomMessage:NSLocalizedString(@"GROUP_CREATED", nil)]; - - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; - void (^completion)(void) = ^{ dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)1.f * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ - [ThreadUtil sendMessageWithText:[@(counter) description] - inThread:thread - quotedReplyModel:nil - messageSender:messageSender]; - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)1.f * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ - [self createNewGroups:counter - 1 recipientId:recipientId]; - }); + [self createNewGroups:counter - 1 recipientId:recipientId]; }); - }; - [messageSender enqueueMessage:message - success:completion - failure:^(NSError *error) { - completion(); - }]; + }); } + (void)injectFakeIncomingMessages:(NSUInteger)counter thread:(TSThread *)thread @@ -4375,7 +4369,6 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac } NSString *filename = filenames.lastObject; [filenames removeLastObject]; - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; NSString *utiType = (NSString *)kUTTypeData; const NSUInteger kDataLength = 32; DataSource *_Nullable dataSource = @@ -4390,11 +4383,7 @@ typedef OWSContact * (^OWSContactBlock)(YapDatabaseReadWriteTransaction *transac [DDLog flushLog]; } OWSAssertDebug(![attachment hasError]); - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 1 * NSEC_PER_SEC), dispatch_get_main_queue(), ^{ sendUnsafeFile(); diff --git a/Signal/src/ViewControllers/DebugUI/DebugUIMisc.m b/Signal/src/ViewControllers/DebugUI/DebugUIMisc.m index ef4c229a9..f39486aa4 100644 --- a/Signal/src/ViewControllers/DebugUI/DebugUIMisc.m +++ b/Signal/src/ViewControllers/DebugUI/DebugUIMisc.m @@ -230,7 +230,6 @@ NS_ASSUME_NONNULL_BEGIN return; } - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; NSString *utiType = [MIMETypeUtil utiTypeForFileExtension:fileName.pathExtension]; DataSource *_Nullable dataSource = [DataSourcePath dataSourceWithFilePath:filePath shouldDeleteOnDeallocation:YES]; [dataSource setSourceFilename:fileName]; @@ -241,11 +240,7 @@ NS_ASSUME_NONNULL_BEGIN OWSFailDebug(@"attachment[%@]: %@", [attachment sourceFilename], [attachment errorName]); return; } - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil]; } + (void)sendUnencryptedDatabase:(TSThread *)thread @@ -259,7 +254,6 @@ NS_ASSUME_NONNULL_BEGIN return; } - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; NSString *utiType = [MIMETypeUtil utiTypeForFileExtension:fileName.pathExtension]; DataSource *_Nullable dataSource = [DataSourcePath dataSourceWithFilePath:filePath shouldDeleteOnDeallocation:YES]; [dataSource setSourceFilename:fileName]; @@ -268,11 +262,7 @@ NS_ASSUME_NONNULL_BEGIN OWSFailDebug(@"attachment[%@]: %@", [attachment sourceFilename], [attachment errorName]); return; } - [ThreadUtil sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:nil - messageSender:messageSender - completion:nil]; + [ThreadUtil enqueueMessageWithAttachment:attachment inThread:thread quotedReplyModel:nil]; } @end diff --git a/Signal/src/ViewControllers/DebugUI/DebugUISessionState.m b/Signal/src/ViewControllers/DebugUI/DebugUISessionState.m index 9d54b9d81..91856eac5 100644 --- a/Signal/src/ViewControllers/DebugUI/DebugUISessionState.m +++ b/Signal/src/ViewControllers/DebugUI/DebugUISessionState.m @@ -51,16 +51,15 @@ NS_ASSUME_NONNULL_BEGIN }], [OWSTableItem itemWithTitle:@"Delete all sessions" actionBlock:^{ - [OWSPrimaryStorage.sharedManager.newDatabaseConnection - readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - [[OWSPrimaryStorage sharedManager] - deleteAllSessionsForContact:thread.contactIdentifier - protocolContext:transaction]; - }]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [[OWSPrimaryStorage sharedManager] + deleteAllSessionsForContact:thread.contactIdentifier + protocolContext:transaction]; + }]; }], [OWSTableItem itemWithTitle:@"Archive all sessions" actionBlock:^{ - [OWSPrimaryStorage.sharedManager.newDatabaseConnection + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [[OWSPrimaryStorage sharedManager] archiveAllSessionsForContact:thread.contactIdentifier @@ -69,9 +68,9 @@ NS_ASSUME_NONNULL_BEGIN }], [OWSTableItem itemWithTitle:@"Send session reset" actionBlock:^{ - [OWSSessionResetJob runWithContactThread:thread - messageSender:SSKEnvironment.shared.messageSender - primaryStorage:[OWSPrimaryStorage sharedManager]]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [self.sessionResetJobQueue addContactThread:thread transaction:transaction]; + }]; }], ]]; } @@ -96,6 +95,18 @@ NS_ASSUME_NONNULL_BEGIN return [OWSTableSection sectionWithTitle:self.name items:items]; } +#pragma mark - Dependencies + +- (OWSSessionResetJobQueue *)sessionResetJobQueue +{ + return AppEnvironment.shared.sessionResetJobQueue; +} + +- (YapDatabaseConnection *)dbConnection +{ + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection; +} + #if DEBUG + (void)clearSessionAndIdentityStore { diff --git a/Signal/src/ViewControllers/DebugUI/DebugUIStress.m b/Signal/src/ViewControllers/DebugUI/DebugUIStress.m index c47cd87a2..fe9a9417c 100644 --- a/Signal/src/ViewControllers/DebugUI/DebugUIStress.m +++ b/Signal/src/ViewControllers/DebugUI/DebugUIStress.m @@ -21,6 +21,28 @@ NS_ASSUME_NONNULL_BEGIN @implementation DebugUIStress +#pragma mark - Dependencies + ++ (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return SSKEnvironment.shared.messageSenderJobQueue; +} + +- (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return self.class.messageSenderJobQueue; +} + +- (YapDatabaseConnection *)dbConnection +{ + return self.class.dbConnection; +} + ++ (YapDatabaseConnection *)dbConnection +{ + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection; +} + #pragma mark - Factory Methods - (NSString *)name @@ -462,14 +484,9 @@ NS_ASSUME_NONNULL_BEGIN { OWSAssertDebug(message); - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; - [messageSender enqueueMessage:message - success:^{ - OWSLogInfo(@"Successfully sent message."); - } - failure:^(NSError *error) { - OWSLogWarn(@"Failed to deliver message with error: %@", error); - }]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; } + (void)sendStressMessage:(TSThread *)thread diff --git a/Signal/src/ViewControllers/DebugUI/DebugUISyncMessages.m b/Signal/src/ViewControllers/DebugUI/DebugUISyncMessages.m index 44ce3db83..be87bfca6 100644 --- a/Signal/src/ViewControllers/DebugUI/DebugUISyncMessages.m +++ b/Signal/src/ViewControllers/DebugUI/DebugUISyncMessages.m @@ -20,6 +20,7 @@ #import #import #import +#import #import #import #import @@ -60,9 +61,9 @@ NS_ASSUME_NONNULL_BEGIN return [OWSTableSection sectionWithTitle:self.name items:items]; } -+ (OWSMessageSender *)messageSender ++ (SSKMessageSenderJobQueue *)messageSenderJobQueue { - return SSKEnvironment.shared.messageSender; + return SSKEnvironment.shared.messageSenderJobQueue; } + (OWSContactsManager *)contactsManager @@ -97,6 +98,8 @@ NS_ASSUME_NONNULL_BEGIN return SSKEnvironment.shared.syncManager; } +#pragma mark - + + (void)sendContactsSyncMessage { [self.syncManager syncAllContacts]; @@ -110,15 +113,12 @@ NS_ASSUME_NONNULL_BEGIN dataSource = [DataSourceValue dataSourceWithSyncMessageData:[syncGroupsMessage buildPlainTextAttachmentDataWithTransaction:transaction]]; }]; - [self.messageSender enqueueTemporaryAttachment:dataSource - contentType:OWSMimeTypeApplicationOctetStream - inMessage:syncGroupsMessage - success:^{ - OWSLogInfo(@"Successfully sent Groups response syncMessage."); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send Groups response syncMessage with error: %@", error); - }]; + + [self.messageSenderJobQueue addMediaMessage:syncGroupsMessage + dataSource:dataSource + contentType:OWSMimeTypeApplicationOctetStream + sourceFilename:nil + isTemporaryAttachment:YES]; } + (void)sendBlockListSyncMessage diff --git a/Signal/src/ViewControllers/HomeView/HomeViewController.m b/Signal/src/ViewControllers/HomeView/HomeViewController.m index 571ba004a..cedca221d 100644 --- a/Signal/src/ViewControllers/HomeView/HomeViewController.m +++ b/Signal/src/ViewControllers/HomeView/HomeViewController.m @@ -1111,32 +1111,22 @@ NSString *const kArchivedConversationsReuseIdentifier = @"kArchivedConversations TSThread *thread = [self threadForIndexPath:indexPath]; - if ([thread isKindOfClass:[TSGroupThread class]]) { - TSGroupThread *gThread = (TSGroupThread *)thread; - if ([gThread.groupModel.groupMemberIds containsObject:[TSAccountManager localNumber]]) { - [ThreadUtil sendLeaveGroupMessageInThread:gThread - presentingViewController:self - messageSender:self.messageSender - completion:^(NSError *_Nullable error) { - if (error) { - NSString *title = NSLocalizedString(@"GROUP_REMOVING_FAILED", - @"Title of alert indicating that group deletion failed."); - - [OWSAlerts showAlertWithTitle:title - message:error.localizedRecoverySuggestion]; - return; - } - - [self deleteThread:thread]; - }]; - } else { - // MJK - turn these trailing elses into guards - [self deleteThread:thread]; - } - } else { - // MJK - turn these trailing elses into guards + if (![thread isKindOfClass:[TSGroupThread class]]) { [self deleteThread:thread]; + return; } + + TSGroupThread *gThread = (TSGroupThread *)thread; + if (![gThread.groupModel.groupMemberIds containsObject:[TSAccountManager localNumber]]) { + [self deleteThread:thread]; + return; + } + + [ThreadUtil enqueueLeaveGroupMessageInThread:gThread]; + + // MJK TODO - DURABLE TESTPLAN is this safe to delete the gThread when the outgoing message hasn't completed + // sending? + [self deleteThread:thread]; } - (void)deleteThread:(TSThread *)thread diff --git a/Signal/src/ViewControllers/NewGroupViewController.m b/Signal/src/ViewControllers/NewGroupViewController.m index 33bda1c69..baf2970e7 100644 --- a/Signal/src/ViewControllers/NewGroupViewController.m +++ b/Signal/src/ViewControllers/NewGroupViewController.m @@ -490,13 +490,17 @@ NS_ASSUME_NONNULL_BEGIN NSData *data = UIImagePNGRepresentation(model.groupImage); DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithData:data fileExtension:@"png"]; - [self.messageSender enqueueTemporaryAttachment:dataSource - contentType:OWSMimeTypeImagePng - inMessage:message - success:successHandler - failure:failureHandler]; + // CLEANUP DURABLE - Replace with a durable operation e.g. `GroupCreateJob`, which creates + // an error in the thread if group creation fails + [self.messageSender sendTemporaryAttachment:dataSource + contentType:OWSMimeTypeImagePng + inMessage:message + success:successHandler + failure:failureHandler]; } else { - [self.messageSender enqueueMessage:message success:successHandler failure:failureHandler]; + // CLEANUP DURABLE - Replace with a durable operation e.g. `GroupCreateJob`, which creates + // an error in the thread if group creation fails + [self.messageSender sendMessage:message success:successHandler failure:failureHandler]; } }); }]; diff --git a/Signal/src/ViewControllers/ThreadSettings/OWSConversationSettingsViewController.m b/Signal/src/ViewControllers/ThreadSettings/OWSConversationSettingsViewController.m index 72abfc8ab..4135b7d4d 100644 --- a/Signal/src/ViewControllers/ThreadSettings/OWSConversationSettingsViewController.m +++ b/Signal/src/ViewControllers/ThreadSettings/OWSConversationSettingsViewController.m @@ -28,7 +28,6 @@ #import #import #import -#import #import #import #import @@ -125,6 +124,15 @@ const CGFloat kIconViewLength = 24; [[NSNotificationCenter defaultCenter] removeObserver:self]; } +#pragma mark - Dependencies + +- (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return SSKEnvironment.shared.messageSenderJobQueue; +} + +#pragma mark + - (void)observeNotifications { [[NSNotificationCenter defaultCenter] addObserver:self @@ -913,20 +921,23 @@ const CGFloat kIconViewLength = 24; } if (self.disappearingMessagesConfiguration.dictionaryValueDidChange) { - [self.disappearingMessagesConfiguration save]; - OWSDisappearingConfigurationUpdateInfoMessage *infoMessage = - [[OWSDisappearingConfigurationUpdateInfoMessage alloc] - initWithTimestamp:[NSDate ows_millisecondTimeStamp] - thread:self.thread - configuration:self.disappearingMessagesConfiguration - createdByRemoteName:nil - createdInExistingGroup:NO]; - [infoMessage save]; + [self.editingDatabaseConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.disappearingMessagesConfiguration saveWithTransaction:transaction]; + OWSDisappearingConfigurationUpdateInfoMessage *infoMessage = + [[OWSDisappearingConfigurationUpdateInfoMessage alloc] + initWithTimestamp:[NSDate ows_millisecondTimeStamp] + thread:self.thread + configuration:self.disappearingMessagesConfiguration + createdByRemoteName:nil + createdInExistingGroup:NO]; + [infoMessage saveWithTransaction:transaction]; - [OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob - runWithConfiguration:self.disappearingMessagesConfiguration - thread:self.thread - messageSender:self.messageSender]; + OWSDisappearingMessagesConfigurationMessage *message = [[OWSDisappearingMessagesConfigurationMessage alloc] + initWithConfiguration:self.disappearingMessagesConfiguration + thread:self.thread]; + + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; } } @@ -1042,16 +1053,9 @@ const CGFloat kIconViewLength = 24; TSGroupThread *gThread = (TSGroupThread *)self.thread; TSOutgoingMessage *message = [TSOutgoingMessage outgoingMessageInThread:gThread groupMetaMessage:TSGroupMetaMessageQuit expiresInSeconds:0]; - [self.messageSender enqueueMessage:message - success:^{ - OWSLogInfo(@"Successfully left group."); - } - failure:^(NSError *error) { - OWSLogWarn(@"Failed to leave group with error: %@", error); - }]; - [self.editingDatabaseConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; [gThread leaveGroupWithTransaction:transaction]; }]; diff --git a/Signal/src/environment/AppEnvironment.swift b/Signal/src/environment/AppEnvironment.swift index e35a59b01..6313393c1 100644 --- a/Signal/src/environment/AppEnvironment.swift +++ b/Signal/src/environment/AppEnvironment.swift @@ -52,6 +52,9 @@ import SignalMessaging @objc public var pushManager: PushManager + @objc + public var sessionResetJobQueue: SessionResetJobQueue + private override init() { self.callMessageHandler = WebRTCCallMessageHandler() self.callService = CallService() @@ -62,6 +65,7 @@ import SignalMessaging self.callNotificationsAdapter = CallNotificationsAdapter() self.pushRegistrationManager = PushRegistrationManager() self.pushManager = PushManager() + self.sessionResetJobQueue = SessionResetJobQueue() super.init() diff --git a/Signal/src/network/PushManager.m b/Signal/src/network/PushManager.m index 7031724e0..59edfd871 100644 --- a/Signal/src/network/PushManager.m +++ b/Signal/src/network/PushManager.m @@ -201,7 +201,10 @@ NSString *const Signal_Message_MarkAsRead_Identifier = @"Signal_Message_MarkAsRe NSString *replyText = responseInfo[UIUserNotificationActionResponseTypedTextKey]; // In line with most apps, we send a normal outgoing messgae here - not a "quoted reply". - [ThreadUtil sendMessageWithText:replyText + + // We use a non-durable send to delay calling the completion handler until sending completes + // in hopes our send will complete before the app gets suspended. + [ThreadUtil sendMessageNonDurablyWithText:replyText inThread:thread quotedReplyModel:nil messageSender:self.messageSender diff --git a/Signal/src/util/Pastelog.m b/Signal/src/util/Pastelog.m index 582f244d9..d67fed6e3 100644 --- a/Signal/src/util/Pastelog.m +++ b/Signal/src/util/Pastelog.m @@ -283,6 +283,15 @@ typedef void (^DebugLogUploadFailure)(DebugLogUploader *uploader, NSError *error return self; } +#pragma mark - Dependencies + +- (YapDatabaseConnection *)dbConnection +{ + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection; +} + +#pragma mark - + + (void)submitLogs { [self submitLogsWithCompletion:nil]; @@ -565,17 +574,18 @@ typedef void (^DebugLogUploadFailure)(DebugLogUploader *uploader, NSError *error return; } NSString *recipientId = [TSAccountManager localNumber]; - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; DispatchMainThreadSafe(^{ __block TSThread *thread = nil; [OWSPrimaryStorage.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { thread = [TSContactThread getOrCreateThreadWithContactId:recipientId transaction:transaction]; }]; - [ThreadUtil sendMessageWithText:url.absoluteString - inThread:thread - quotedReplyModel:nil - messageSender:messageSender]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [ThreadUtil enqueueMessageWithText:url.absoluteString + inThread:thread + quotedReplyModel:nil + transaction:transaction]; + }]; }); // Also copy to pasteboard. @@ -594,11 +604,12 @@ typedef void (^DebugLogUploadFailure)(DebugLogUploader *uploader, NSError *error }]; DispatchMainThreadSafe(^{ if (thread) { - OWSMessageSender *messageSender = SSKEnvironment.shared.messageSender; - [ThreadUtil sendMessageWithText:url.absoluteString - inThread:thread - quotedReplyModel:nil - messageSender:messageSender]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [ThreadUtil enqueueMessageWithText:url.absoluteString + inThread:thread + quotedReplyModel:nil + transaction:transaction]; + }]; } else { [Pastelog showFailureAlertWithMessage:@"Could not find last thread."]; } diff --git a/SignalMessaging/ViewControllers/SharingThreadPickerViewController.m b/SignalMessaging/ViewControllers/SharingThreadPickerViewController.m index 277332f85..1ba5c8189 100644 --- a/SignalMessaging/ViewControllers/SharingThreadPickerViewController.m +++ b/SignalMessaging/ViewControllers/SharingThreadPickerViewController.m @@ -236,13 +236,16 @@ typedef void (^SendMessageBlock)(SendCompletionBlock completion); OWSAssertIsOnMainThread(); __block TSOutgoingMessage *outgoingMessage = nil; - outgoingMessage = [ThreadUtil sendMessageWithAttachment:attachment - inThread:self.thread - quotedReplyModel:nil - messageSender:self.messageSender - completion:^(NSError *_Nullable error) { - sendCompletion(error, outgoingMessage); - }]; + // DURABLE CLEANUP - SAE uses non-durable sending to make sure the app is running long enough to complete + // the sending operation. Alternatively, we could use a durable send, but do more to make sure the + // SAE runs as long as it needs. + outgoingMessage = [ThreadUtil sendMessageNonDurablyWithAttachment:attachment + inThread:self.thread + quotedReplyModel:nil + messageSender:self.messageSender + completion:^(NSError *_Nullable error) { + sendCompletion(error, outgoingMessage); + }]; // This is necessary to show progress. self.outgoingMessage = outgoingMessage; @@ -268,7 +271,10 @@ typedef void (^SendMessageBlock)(SendCompletionBlock completion); OWSAssertIsOnMainThread(); __block TSOutgoingMessage *outgoingMessage = nil; - outgoingMessage = [ThreadUtil sendMessageWithText:messageText + // DURABLE CLEANUP - SAE uses non-durable sending to make sure the app is running long enough to complete + // the sending operation. Alternatively, we could use a durable send, but do more to make sure the + // SAE runs as long as it needs. + outgoingMessage = [ThreadUtil sendMessageNonDurablyWithText:messageText inThread:self.thread quotedReplyModel:nil messageSender:self.messageSender @@ -300,6 +306,8 @@ typedef void (^SendMessageBlock)(SendCompletionBlock completion); [ThreadUtil addThreadToProfileWhitelistIfEmptyContactThread:self.thread]; [self tryToSendMessageWithBlock:^(SendCompletionBlock sendCompletion) { OWSAssertIsOnMainThread(); + // TODO - in line with QuotedReply and other message attachments, saving should happen as part of sending + // preparation rather than duplicated here and in the SAE [self.editingDBConnection asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { if (contactShare.avatarImage) { [contactShare.dbRecord saveAvatarImage:contactShare.avatarImage transaction:transaction]; @@ -307,12 +315,12 @@ typedef void (^SendMessageBlock)(SendCompletionBlock completion); } completionBlock:^{ __block TSOutgoingMessage *outgoingMessage = nil; - outgoingMessage = [ThreadUtil sendMessageWithContactShare:contactShare.dbRecord - inThread:self.thread - messageSender:self.messageSender - completion:^(NSError *_Nullable error) { - sendCompletion(error, outgoingMessage); - }]; + outgoingMessage = [ThreadUtil sendMessageNonDurablyWithContactShare:contactShare.dbRecord + inThread:self.thread + messageSender:self.messageSender + completion:^(NSError *_Nullable error) { + sendCompletion(error, outgoingMessage); + }]; // This is necessary to show progress. self.outgoingMessage = outgoingMessage; }]; @@ -537,7 +545,7 @@ typedef void (^SendMessageBlock)(SendCompletionBlock completion); presentViewController:progressAlert animated:YES completion:^(void) { - [self.messageSender enqueueMessage:message + [self.messageSender sendMessage:message success:^(void) { OWSLogInfo(@"Resending attachment succeeded."); dispatch_async(dispatch_get_main_queue(), ^(void) { diff --git a/SignalMessaging/contacts/OWSSyncManager.m b/SignalMessaging/contacts/OWSSyncManager.m index 57e6a27c3..07e2ad32a 100644 --- a/SignalMessaging/contacts/OWSSyncManager.m +++ b/SignalMessaging/contacts/OWSSyncManager.m @@ -88,6 +88,13 @@ NSString *const kSyncManagerLastContactSyncKey = @"kTSStorageManagerOWSSyncManag return SSKEnvironment.shared.messageSender; } +- (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + OWSAssertDebug(SSKEnvironment.shared.messageSenderJobQueue); + + return SSKEnvironment.shared.messageSenderJobQueue; +} + - (OWSProfileManager *)profileManager { OWSAssertDebug(SSKEnvironment.shared.profileManager); @@ -161,8 +168,10 @@ NSString *const kSyncManagerLastContactSyncKey = @"kTSStorageManagerOWSSyncManag self.isRequestInFlight = YES; + // DURABLE CLEANUP - we could replace the custom durability logic in this class + // with a durable JobQueue. DataSource *dataSource = [DataSourceValue dataSourceWithSyncMessageData:messageData]; - [self.messageSender enqueueTemporaryAttachment:dataSource + [self.messageSender sendTemporaryAttachment:dataSource contentType:OWSMimeTypeApplicationOctetStream inMessage:syncContactsMessage success:^{ @@ -214,13 +223,10 @@ NSString *const kSyncManagerLastContactSyncKey = @"kTSStorageManagerOWSSyncManag OWSSyncConfigurationMessage *syncConfigurationMessage = [[OWSSyncConfigurationMessage alloc] initWithReadReceiptsEnabled:areReadReceiptsEnabled showUnidentifiedDeliveryIndicators:showUnidentifiedDeliveryIndicators]; - [self.messageSender enqueueMessage:syncConfigurationMessage - success:^{ - OWSLogInfo(@"Send configuration sync message succeeded."); - } - failure:^(NSError *error) { - OWSLogError(@"Send configuration sync message failed with error: %@", error); - }]; + + [self.editingDatabaseConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:syncConfigurationMessage transaction:transaction]; + }]; } #pragma mark - Local Sync @@ -253,7 +259,7 @@ NSString *const kSyncManagerLastContactSyncKey = @"kTSStorageManagerOWSSyncManag }]; AnyPromise *promise = [AnyPromise promiseWithResolverBlock:^(PMKResolver resolve) { - [self.messageSender enqueueTemporaryAttachment:dataSource + [self.messageSender sendTemporaryAttachment:dataSource contentType:OWSMimeTypeApplicationOctetStream inMessage:syncContactsMessage success:^{ diff --git a/SignalMessaging/environment/AppSetup.m b/SignalMessaging/environment/AppSetup.m index 85f476cbe..1ee77ad7b 100644 --- a/SignalMessaging/environment/AppSetup.m +++ b/SignalMessaging/environment/AppSetup.m @@ -23,6 +23,7 @@ #import #import #import +#import #import NS_ASSUME_NONNULL_BEGIN @@ -62,6 +63,7 @@ NS_ASSUME_NONNULL_BEGIN OWSContactsManager *contactsManager = [[OWSContactsManager alloc] initWithPrimaryStorage:primaryStorage]; ContactsUpdater *contactsUpdater = [ContactsUpdater new]; OWSMessageSender *messageSender = [[OWSMessageSender alloc] initWithPrimaryStorage:primaryStorage]; + SSKMessageSenderJobQueue *messageSenderJobQueue = [SSKMessageSenderJobQueue new]; OWSProfileManager *profileManager = [[OWSProfileManager alloc] initWithPrimaryStorage:primaryStorage]; OWSMessageManager *messageManager = [[OWSMessageManager alloc] initWithPrimaryStorage:primaryStorage]; OWSBlockingManager *blockingManager = [[OWSBlockingManager alloc] initWithPrimaryStorage:primaryStorage]; @@ -94,6 +96,7 @@ NS_ASSUME_NONNULL_BEGIN [SSKEnvironment setShared:[[SSKEnvironment alloc] initWithContactsManager:contactsManager messageSender:messageSender + messageSenderJobQueue:messageSenderJobQueue profileManager:profileManager primaryStorage:primaryStorage contactsUpdater:contactsUpdater diff --git a/SignalMessaging/profiles/OWSProfileManager.m b/SignalMessaging/profiles/OWSProfileManager.m index 1f320c63b..5b35303ec 100644 --- a/SignalMessaging/profiles/OWSProfileManager.m +++ b/SignalMessaging/profiles/OWSProfileManager.m @@ -122,7 +122,8 @@ typedef void (^ProfileManagerFailureBlock)(NSError *error); #pragma mark - Dependencies -- (TSAccountManager *)tsAccountManager { +- (TSAccountManager *)tsAccountManager +{ return TSAccountManager.sharedInstance; } @@ -136,17 +137,18 @@ typedef void (^ProfileManagerFailureBlock)(NSError *error); return SSKEnvironment.shared.identityManager; } -- (OWSMessageSender *)messageSender { - OWSAssertDebug(SSKEnvironment.shared.messageSender); - - return SSKEnvironment.shared.messageSender; +- (SSKMessageSenderJobQueue *)messageSenderJobQueue +{ + return SSKEnvironment.shared.messageSenderJobQueue; } -- (TSNetworkManager *)networkManager { +- (TSNetworkManager *)networkManager +{ return SSKEnvironment.shared.networkManager; } -- (OWSBlockingManager *)blockingManager { +- (OWSBlockingManager *)blockingManager +{ return SSKEnvironment.shared.blockingManager; } @@ -1407,43 +1409,32 @@ typedef void (^ProfileManagerFailureBlock)(NSError *error); [alertController addAction:[UIAlertAction actionWithTitle:shareTitle style:UIAlertActionStyleDefault handler:^(UIAlertAction *_Nonnull action) { - [self userAddedThreadToProfileWhitelist:thread - success:successHandler]; + [self userAddedThreadToProfileWhitelist:thread]; + successHandler(); }]]; [alertController addAction:[OWSAlerts cancelAction]]; [fromViewController presentViewController:alertController animated:YES completion:nil]; } -- (void)userAddedThreadToProfileWhitelist:(TSThread *)thread success:(void (^)(void))successHandler +- (void)userAddedThreadToProfileWhitelist:(TSThread *)thread { OWSAssertIsOnMainThread(); - OWSProfileKeyMessage *message = - [[OWSProfileKeyMessage alloc] initWithTimestamp:[NSDate ows_millisecondTimeStamp] inThread:thread]; - BOOL isFeatureEnabled = NO; if (!isFeatureEnabled) { OWSLogWarn(@"skipping sending profile-key message because the feature is not yet fully available."); [OWSProfileManager.sharedManager addThreadToProfileWhitelist:thread]; - successHandler(); return; } - [self.messageSender enqueueMessage:message - success:^{ - OWSLogInfo(@"Successfully sent profile key message to thread: %@", thread); - [OWSProfileManager.sharedManager addThreadToProfileWhitelist:thread]; + OWSProfileKeyMessage *message = + [[OWSProfileKeyMessage alloc] initWithTimestamp:[NSDate ows_millisecondTimeStamp] inThread:thread]; + [OWSProfileManager.sharedManager addThreadToProfileWhitelist:thread]; - dispatch_async(dispatch_get_main_queue(), ^{ - successHandler(); - }); - } - failure:^(NSError *_Nonnull error) { - dispatch_async(dispatch_get_main_queue(), ^{ - OWSLogError(@"Failed to send profile key message to thread: %@", thread); - }); - }]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; } #pragma mark - Notifications diff --git a/SignalMessaging/utils/BlockListUIUtils.m b/SignalMessaging/utils/BlockListUIUtils.m index 8f2429e5f..ec757d16f 100644 --- a/SignalMessaging/utils/BlockListUIUtils.m +++ b/SignalMessaging/utils/BlockListUIUtils.m @@ -238,32 +238,21 @@ typedef void (^BlockAlertCompletionBlock)(UIAlertAction *action); // via params and instead have to create our own sneaky transaction here. [groupThread leaveGroupWithSneakyTransaction]; - [ThreadUtil - sendLeaveGroupMessageInThread:groupThread - presentingViewController:fromViewController - messageSender:messageSender - completion:^(NSError *_Nullable error) { - if (error) { - OWSLogError(@"Failed to leave blocked group with error: %@", error); - } + [ThreadUtil enqueueLeaveGroupMessageInThread:groupThread]; - NSString *groupName - = groupThread.name.length > 0 ? groupThread.name : TSGroupThread.defaultGroupName; + NSString *groupName = groupThread.name.length > 0 ? groupThread.name : TSGroupThread.defaultGroupName; - [self - showOkAlertWithTitle:NSLocalizedString(@"BLOCK_LIST_VIEW_BLOCKED_GROUP_ALERT_TITLE", - @"The title of the 'group blocked' alert.") - message:[NSString - stringWithFormat: - NSLocalizedString( - @"BLOCK_LIST_VIEW_BLOCKED_ALERT_MESSAGE_FORMAT", - @"The message format of the 'conversation blocked' " - @"alert. " - @"Embeds the {{conversation title}}."), - [self formatDisplayNameForAlertMessage:groupName]] - fromViewController:fromViewController - completionBlock:completionBlock]; - }]; + NSString *alertTitle + = NSLocalizedString(@"BLOCK_LIST_VIEW_BLOCKED_GROUP_ALERT_TITLE", @"The title of the 'group blocked' alert."); + NSString *alertBodyFormat = NSLocalizedString(@"BLOCK_LIST_VIEW_BLOCKED_ALERT_MESSAGE_FORMAT", + @"The message format of the 'conversation blocked' alert. Embeds the {{conversation title}}."); + NSString *alertBody = + [NSString stringWithFormat:alertBodyFormat, [self formatDisplayNameForAlertMessage:groupName]]; + + [self showOkAlertWithTitle:alertTitle + message:alertBody + fromViewController:fromViewController + completionBlock:completionBlock]; } #pragma mark - Unblock diff --git a/SignalMessaging/utils/ThreadUtil.h b/SignalMessaging/utils/ThreadUtil.h index 05b9bf69c..e882e98a1 100644 --- a/SignalMessaging/utils/ThreadUtil.h +++ b/SignalMessaging/utils/ThreadUtil.h @@ -38,44 +38,52 @@ NS_ASSUME_NONNULL_BEGIN @class OWSContact; @class OWSQuotedReplyModel; @class TSOutgoingMessage; +@class YapDatabaseReadWriteTransaction; @interface ThreadUtil : NSObject -+ (TSOutgoingMessage *)sendMessageWithText:(NSString *)text - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler; +#pragma mark - Durable Message Enqueue -+ (TSOutgoingMessage *)sendMessageWithText:(NSString *)text - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender; ++ (TSOutgoingMessage *)enqueueMessageWithText:(NSString *)text + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + transaction:(YapDatabaseReadWriteTransaction *)transaction; -+ (TSOutgoingMessage *)sendMessageWithAttachment:(SignalAttachment *)attachment - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender - completion:(void (^_Nullable)(NSError *_Nullable error))completion; ++ (TSOutgoingMessage *)enqueueMessageWithAttachment:(SignalAttachment *)attachment + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel; -// We only should set ignoreErrors in debug or test code. -+ (TSOutgoingMessage *)sendMessageWithAttachment:(SignalAttachment *)attachment - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender - ignoreErrors:(BOOL)ignoreErrors - completion:(void (^_Nullable)(NSError *_Nullable error))completion; ++ (TSOutgoingMessage *)enqueueMessageWithAttachment:(SignalAttachment *)attachment + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + ignoreErrors:(BOOL)ignoreErrors; -+ (TSOutgoingMessage *)sendMessageWithContactShare:(OWSContact *)contactShare - inThread:(TSThread *)thread - messageSender:(OWSMessageSender *)messageSender - completion:(void (^_Nullable)(NSError *_Nullable error))completion; ++ (TSOutgoingMessage *)enqueueMessageWithContactShare:(OWSContact *)contactShare inThread:(TSThread *)thread; ++ (void)enqueueLeaveGroupMessageInThread:(TSGroupThread *)thread; + +#pragma mark - Non-Durable Sending + +// Used by SAE and "reply from lockscreen", otherwise we should use the durable `enqueue` counterpart ++ (TSOutgoingMessage *)sendMessageNonDurablyWithText:(NSString *)text + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + messageSender:(OWSMessageSender *)messageSender + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler; + +// Used by SAE, otherwise we should use the durable `enqueue` counterpart ++ (TSOutgoingMessage *)sendMessageNonDurablyWithAttachment:(SignalAttachment *)attachment + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + messageSender:(OWSMessageSender *)messageSender + completion:(void (^_Nullable)(NSError *_Nullable error))completion; + +// Used by SAE, otherwise we should use the durable `enqueue` counterpart ++ (TSOutgoingMessage *)sendMessageNonDurablyWithContactShare:(OWSContact *)contactShare + inThread:(TSThread *)thread + messageSender:(OWSMessageSender *)messageSender + completion:(void (^_Nullable)(NSError *_Nullable error))completion; -+ (void)sendLeaveGroupMessageInThread:(TSGroupThread *)thread - presentingViewController:(UIViewController *)presentingViewController - messageSender:(OWSMessageSender *)messageSender - completion:(void (^_Nullable)(NSError *_Nullable error))completion; #pragma mark - dynamic interactions diff --git a/SignalMessaging/utils/ThreadUtil.m b/SignalMessaging/utils/ThreadUtil.m index 4d9029050..f654d0da3 100644 --- a/SignalMessaging/utils/ThreadUtil.m +++ b/SignalMessaging/utils/ThreadUtil.m @@ -51,30 +51,141 @@ NS_ASSUME_NONNULL_BEGIN @implementation ThreadUtil -+ (TSOutgoingMessage *)sendMessageWithText:(NSString *)text - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender +#pragma mark - Dependencies + ++ (SSKMessageSenderJobQueue *)messageSenderJobQueue { - return [self sendMessageWithText:text - inThread:thread - quotedReplyModel:quotedReplyModel - messageSender:messageSender - success:^{ - OWSLogInfo(@"Successfully sent message."); - } - failure:^(NSError *error) { - OWSLogWarn(@"Failed to deliver message with error: %@", error); - }]; + return SSKEnvironment.shared.messageSenderJobQueue; } ++ (YapDatabaseConnection *)dbConnection +{ + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection; +} -+ (TSOutgoingMessage *)sendMessageWithText:(NSString *)text - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler +#pragma mark - Durable Message Enqueue + ++ (TSOutgoingMessage *)enqueueMessageWithText:(NSString *)text + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + transaction:(YapDatabaseReadWriteTransaction *)transaction +{ + OWSDisappearingMessagesConfiguration *configuration = + [OWSDisappearingMessagesConfiguration fetchObjectWithUniqueID:thread.uniqueId transaction:transaction]; + + uint32_t expiresInSeconds = (configuration.isEnabled ? configuration.durationSeconds : 0); + + TSOutgoingMessage *message = + [TSOutgoingMessage outgoingMessageInThread:thread + messageBody:text + attachmentId:nil + expiresInSeconds:expiresInSeconds + quotedMessage:[quotedReplyModel buildQuotedMessageForSending]]; + + [message saveWithTransaction:transaction]; + + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + + return message; +} + ++ (TSOutgoingMessage *)enqueueMessageWithAttachment:(SignalAttachment *)attachment + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel +{ + return [self enqueueMessageWithAttachment:attachment + inThread:thread + quotedReplyModel:quotedReplyModel + ignoreErrors:NO]; +} + ++ (TSOutgoingMessage *)enqueueMessageWithAttachment:(SignalAttachment *)attachment + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + ignoreErrors:(BOOL)ignoreErrors +{ + OWSAssertIsOnMainThread(); + OWSAssertDebug(attachment); + OWSAssertDebug(!attachment.hasError); + OWSAssertDebug(attachment.mimeType.length > 0); + OWSAssertDebug(thread); + + OWSDisappearingMessagesConfiguration *configuration = + [OWSDisappearingMessagesConfiguration fetchObjectWithUniqueID:thread.uniqueId]; + + uint32_t expiresInSeconds = (configuration.isEnabled ? configuration.durationSeconds : 0); + TSOutgoingMessage *message = + [[TSOutgoingMessage alloc] initOutgoingMessageWithTimestamp:[NSDate ows_millisecondTimeStamp] + inThread:thread + messageBody:attachment.captionText + attachmentIds:[NSMutableArray new] + expiresInSeconds:expiresInSeconds + expireStartedAt:0 + isVoiceMessage:[attachment isVoiceMessage] + groupMetaMessage:TSGroupMetaMessageUnspecified + quotedMessage:[quotedReplyModel buildQuotedMessageForSending] + contactShare:nil]; + + [self.messageSenderJobQueue addMediaMessage:message + dataSource:attachment.dataSource + contentType:attachment.mimeType + sourceFilename:attachment.filenameOrDefault + isTemporaryAttachment:NO]; + + return message; +} + ++ (TSOutgoingMessage *)enqueueMessageWithContactShare:(OWSContact *)contactShare inThread:(TSThread *)thread; +{ + OWSAssertIsOnMainThread(); + OWSAssertDebug(contactShare); + OWSAssertDebug(contactShare.ows_isValid); + OWSAssertDebug(thread); + + OWSDisappearingMessagesConfiguration *configuration = + [OWSDisappearingMessagesConfiguration fetchObjectWithUniqueID:thread.uniqueId]; + + uint32_t expiresInSeconds = (configuration.isEnabled ? configuration.durationSeconds : 0); + TSOutgoingMessage *message = + [[TSOutgoingMessage alloc] initOutgoingMessageWithTimestamp:[NSDate ows_millisecondTimeStamp] + inThread:thread + messageBody:nil + attachmentIds:[NSMutableArray new] + expiresInSeconds:expiresInSeconds + expireStartedAt:0 + isVoiceMessage:NO + groupMetaMessage:TSGroupMetaMessageUnspecified + quotedMessage:nil + contactShare:contactShare]; + + [self.dbConnection asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [message saveWithTransaction:transaction]; + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; + + return message; +} + ++ (void)enqueueLeaveGroupMessageInThread:(TSGroupThread *)thread +{ + OWSAssertDebug([thread isKindOfClass:[TSGroupThread class]]); + + TSOutgoingMessage *message = + [TSOutgoingMessage outgoingMessageInThread:thread groupMetaMessage:TSGroupMetaMessageQuit expiresInSeconds:0]; + + [self.dbConnection asyncReadWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; +} + +// MARK: Non-Durable Sending + ++ (TSOutgoingMessage *)sendMessageNonDurablyWithText:(NSString *)text + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + messageSender:(OWSMessageSender *)messageSender + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler { OWSAssertIsOnMainThread(); OWSAssertDebug(text.length > 0); @@ -91,35 +202,19 @@ NS_ASSUME_NONNULL_BEGIN expiresInSeconds:expiresInSeconds quotedMessage:[quotedReplyModel buildQuotedMessageForSending]]; - [messageSender enqueueMessage:message success:successHandler failure:failureHandler]; + [messageSender sendMessage:message success:successHandler failure:failureHandler]; return message; } -+ (TSOutgoingMessage *)sendMessageWithAttachment:(SignalAttachment *)attachment - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender - completion:(void (^_Nullable)(NSError *_Nullable error))completion -{ - return [self sendMessageWithAttachment:attachment - inThread:thread - quotedReplyModel:quotedReplyModel - messageSender:messageSender - ignoreErrors:NO - completion:completion]; -} - -+ (TSOutgoingMessage *)sendMessageWithAttachment:(SignalAttachment *)attachment - inThread:(TSThread *)thread - quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel - messageSender:(OWSMessageSender *)messageSender - ignoreErrors:(BOOL)ignoreErrors - completion:(void (^_Nullable)(NSError *_Nullable error))completion ++ (TSOutgoingMessage *)sendMessageNonDurablyWithAttachment:(SignalAttachment *)attachment + inThread:(TSThread *)thread + quotedReplyModel:(nullable OWSQuotedReplyModel *)quotedReplyModel + messageSender:(OWSMessageSender *)messageSender + completion:(void (^_Nullable)(NSError *_Nullable error))completion { OWSAssertIsOnMainThread(); OWSAssertDebug(attachment); - OWSAssertDebug(ignoreErrors || ![attachment hasError]); OWSAssertDebug([attachment mimeType].length > 0); OWSAssertDebug(thread); OWSAssertDebug(messageSender); @@ -140,7 +235,7 @@ NS_ASSUME_NONNULL_BEGIN quotedMessage:[quotedReplyModel buildQuotedMessageForSending] contactShare:nil]; - [messageSender enqueueAttachment:attachment.dataSource + [messageSender sendAttachment:attachment.dataSource contentType:attachment.mimeType sourceFilename:attachment.filenameOrDefault inMessage:message @@ -164,10 +259,10 @@ NS_ASSUME_NONNULL_BEGIN return message; } -+ (TSOutgoingMessage *)sendMessageWithContactShare:(OWSContact *)contactShare - inThread:(TSThread *)thread - messageSender:(OWSMessageSender *)messageSender - completion:(void (^_Nullable)(NSError *_Nullable error))completion ++ (TSOutgoingMessage *)sendMessageNonDurablyWithContactShare:(OWSContact *)contactShare + inThread:(TSThread *)thread + messageSender:(OWSMessageSender *)messageSender + completion:(void (^_Nullable)(NSError *_Nullable error))completion { OWSAssertIsOnMainThread(); OWSAssertDebug(contactShare); @@ -191,7 +286,7 @@ NS_ASSUME_NONNULL_BEGIN quotedMessage:nil contactShare:contactShare]; - [messageSender enqueueMessage:message + [messageSender sendMessage:message success:^{ OWSLogDebug(@"Successfully sent contact share."); if (completion) { @@ -212,47 +307,6 @@ NS_ASSUME_NONNULL_BEGIN return message; } -+ (void)sendLeaveGroupMessageInThread:(TSGroupThread *)thread - presentingViewController:(UIViewController *)presentingViewController - messageSender:(OWSMessageSender *)messageSender - completion:(void (^_Nullable)(NSError *_Nullable error))completion -{ - OWSAssertDebug([thread isKindOfClass:[TSGroupThread class]]); - OWSAssertDebug(presentingViewController); - OWSAssertDebug(messageSender); - - NSString *groupName = thread.name.length > 0 ? thread.name : TSGroupThread.defaultGroupName; - NSString *title = [NSString - stringWithFormat:NSLocalizedString(@"GROUP_REMOVING", @"Modal text when removing a group"), groupName]; - UIAlertController *removingFromGroup = - [UIAlertController alertControllerWithTitle:title message:nil preferredStyle:UIAlertControllerStyleAlert]; - [presentingViewController presentViewController:removingFromGroup animated:YES completion:nil]; - - TSOutgoingMessage *message = - [TSOutgoingMessage outgoingMessageInThread:thread groupMetaMessage:TSGroupMetaMessageQuit expiresInSeconds:0]; - [messageSender enqueueMessage:message - success:^{ - dispatch_async(dispatch_get_main_queue(), ^{ - [presentingViewController dismissViewControllerAnimated:YES - completion:^{ - if (completion) { - completion(nil); - } - }]; - }); - } - failure:^(NSError *error) { - dispatch_async(dispatch_get_main_queue(), ^{ - [presentingViewController dismissViewControllerAnimated:YES - completion:^{ - if (completion) { - completion(error); - } - }]; - }); - }]; -} - #pragma mark - Dynamic Interactions + (ThreadDynamicInteractions *)ensureDynamicInteractionsForThread:(TSThread *)thread diff --git a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h index d3b00bbe2..d6a131b96 100644 --- a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h +++ b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h @@ -193,7 +193,9 @@ typedef NS_ENUM(NSInteger, TSGroupMetaMessage) { transaction:(YapDatabaseReadWriteTransaction *)transaction; // This method is used to record a failed send to all "sending" recipients. -- (void)updateWithSendingError:(NSError *)error; +- (void)updateWithSendingError:(NSError *)error + transaction:(YapDatabaseReadWriteTransaction *)transaction + NS_SWIFT_NAME(update(sendingError:transaction:)); - (void)updateWithHasSyncedTranscript:(BOOL)hasSyncedTranscript transaction:(YapDatabaseReadWriteTransaction *)transaction; diff --git a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m index c78065105..e2e515a0f 100644 --- a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m +++ b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m @@ -575,23 +575,20 @@ NSString *NSStringForOutgoingMessageRecipientState(OWSOutgoingMessageRecipientSt #pragma mark - Update With... Methods -- (void)updateWithSendingError:(NSError *)error +- (void)updateWithSendingError:(NSError *)error transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(error); - - [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - [self applyChangeToSelfAndLatestCopy:transaction - changeBlock:^(TSOutgoingMessage *message) { - // Mark any "sending" recipients as "failed." - for (TSOutgoingMessageRecipientState *recipientState in message.recipientStateMap - .allValues) { - if (recipientState.state == OWSOutgoingMessageRecipientStateSending) { - recipientState.state = OWSOutgoingMessageRecipientStateFailed; - } + [self applyChangeToSelfAndLatestCopy:transaction + changeBlock:^(TSOutgoingMessage *message) { + // Mark any "sending" recipients as "failed." + for (TSOutgoingMessageRecipientState *recipientState in message.recipientStateMap + .allValues) { + if (recipientState.state == OWSOutgoingMessageRecipientStateSending) { + recipientState.state = OWSOutgoingMessageRecipientStateFailed; } - [message setMostRecentFailureText:error.localizedDescription]; - }]; - }]; + } + [message setMostRecentFailureText:error.localizedDescription]; + }]; } - (void)updateWithAllSendingRecipientsMarkedAsFailedWithTansaction:(YapDatabaseReadWriteTransaction *)transaction diff --git a/SignalServiceKit/src/Messages/OWSBlockingManager.m b/SignalServiceKit/src/Messages/OWSBlockingManager.m index 46c810e48..9c81aa91c 100644 --- a/SignalServiceKit/src/Messages/OWSBlockingManager.m +++ b/SignalServiceKit/src/Messages/OWSBlockingManager.m @@ -387,11 +387,12 @@ NSString *const kOWSBlockingManager_SyncedBlockedGroupIdsKey = @"kOWSBlockingMan OWSBlockedPhoneNumbersMessage *message = [[OWSBlockedPhoneNumbersMessage alloc] initWithPhoneNumbers:blockedPhoneNumbers groupIds:blockedGroupIds]; - [self.messageSender enqueueMessage:message + [self.messageSender sendMessage:message success:^{ OWSLogInfo(@"Successfully sent blocked phone numbers sync message"); - // Record the last set of "blocked phone numbers" which we successfully synced. + // DURABLE CLEANUP - we could replace the custom durability logic in this class + // with a durable JobQueue. [self saveSyncedBlockListWithPhoneNumbers:blockedPhoneNumbers groupIds:blockedGroupIds]; } failure:^(NSError *error) { diff --git a/SignalServiceKit/src/Messages/OWSIdentityManager.m b/SignalServiceKit/src/Messages/OWSIdentityManager.m index 590209bdf..aa3fe3460 100644 --- a/SignalServiceKit/src/Messages/OWSIdentityManager.m +++ b/SignalServiceKit/src/Messages/OWSIdentityManager.m @@ -640,16 +640,20 @@ NSString *const kNSNotificationName_IdentityStateDidChange = @"kNSNotificationNa // subsequently OWSOutgoingNullMessage *nullMessage = [[OWSOutgoingNullMessage alloc] initWithContactThread:contactThread verificationStateSyncMessage:message]; - [self.messageSender enqueueMessage:nullMessage + + // DURABLE CLEANUP - we could replace the custom durability logic in this class + // with a durable JobQueue. + [self.messageSender sendMessage:nullMessage success:^{ OWSLogInfo(@"Successfully sent verification state NullMessage"); - [self.messageSender enqueueMessage:message + [self.messageSender sendMessage:message success:^{ OWSLogInfo(@"Successfully sent verification state sync message"); // Record that this verification state was successfully synced. - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction * transaction) { - [self clearSyncMessageForRecipientId:message.verificationForRecipientId transaction:transaction]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [self clearSyncMessageForRecipientId:message.verificationForRecipientId + transaction:transaction]; }]; } failure:^(NSError *error) { @@ -662,7 +666,7 @@ NSString *const kNSNotificationName_IdentityStateDidChange = @"kNSNotificationNa OWSLogInfo(@"Removing retries for syncing verification state, since user is no longer registered: %@", message.verificationForRecipientId); // Otherwise this will fail forever. - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction * transaction) { + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [self clearSyncMessageForRecipientId:message.verificationForRecipientId transaction:transaction]; }]; } diff --git a/SignalServiceKit/src/Messages/OWSMessageManager.m b/SignalServiceKit/src/Messages/OWSMessageManager.m index 55daef3f5..04cb9d311 100644 --- a/SignalServiceKit/src/Messages/OWSMessageManager.m +++ b/SignalServiceKit/src/Messages/OWSMessageManager.m @@ -107,11 +107,9 @@ NS_ASSUME_NONNULL_BEGIN return SSKEnvironment.shared.contactsManager; } -- (OWSMessageSender *)messageSender +- (SSKMessageSenderJobQueue *)messageSenderJobQueue { - OWSAssertDebug(SSKEnvironment.shared.messageSender); - - return SSKEnvironment.shared.messageSender; + return SSKEnvironment.shared.messageSenderJobQueue; } - (OWSBlockingManager *)blockingManager @@ -553,13 +551,8 @@ NS_ASSUME_NONNULL_BEGIN OWSSyncGroupsRequestMessage *syncGroupsRequestMessage = [[OWSSyncGroupsRequestMessage alloc] initWithThread:thread groupId:groupId]; - [self.messageSender enqueueMessage:syncGroupsRequestMessage - success:^{ - OWSLogWarn(@"Successfully sent Request Group Info message."); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send Request Group Info message with error: %@", error); - }]; + + [self.messageSenderJobQueue addMessage:syncGroupsRequestMessage transaction:transaction]; } - (id)profileManager @@ -833,15 +826,11 @@ NS_ASSUME_NONNULL_BEGIN return; } DataSource *dataSource = [DataSourceValue dataSourceWithSyncMessageData:syncData]; - [self.messageSender enqueueTemporaryAttachment:dataSource - contentType:OWSMimeTypeApplicationOctetStream - inMessage:syncGroupsMessage - success:^{ - OWSLogInfo(@"Successfully sent Groups response syncMessage."); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send Groups response syncMessage with error: %@", error); - }]; + [self.messageSenderJobQueue addMediaMessage:syncGroupsMessage + dataSource:dataSource + contentType:OWSMimeTypeApplicationOctetStream + sourceFilename:nil + isTemporaryAttachment:YES]; } else if (syncMessage.request.type == SSKProtoSyncMessageRequestTypeBlocked) { OWSLogInfo(@"Received request for block list"); [self.blockingManager syncBlockList]; @@ -993,44 +982,6 @@ NS_ASSUME_NONNULL_BEGIN [self handleReceivedEnvelope:envelope withDataMessage:dataMessage attachmentIds:@[] transaction:transaction]; } -- (void)sendGroupUpdateForThread:(TSGroupThread *)gThread message:(TSOutgoingMessage *)message -{ - if (!gThread) { - OWSFailDebug(@"Missing gThread."); - return; - } - if (!gThread.groupModel) { - OWSFailDebug(@"Missing gThread.groupModel."); - return; - } - if (!message) { - OWSFailDebug(@"Missing message."); - return; - } - - if (gThread.groupModel.groupImage) { - NSData *data = UIImagePNGRepresentation(gThread.groupModel.groupImage); - DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithData:data fileExtension:@"png"]; - [self.messageSender enqueueTemporaryAttachment:dataSource - contentType:OWSMimeTypeImagePng - inMessage:message - success:^{ - OWSLogDebug(@"Successfully sent group update with avatar"); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send group avatar update with error: %@", error); - }]; - } else { - [self.messageSender enqueueMessage:message - success:^{ - OWSLogDebug(@"Successfully sent group update"); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send group update with error: %@", error); - }]; - } -} - - (void)handleGroupInfoRequest:(SSKProtoEnvelope *)envelope dataMessage:(SSKProtoDataMessage *)dataMessage transaction:(YapDatabaseReadWriteTransaction *)transaction @@ -1093,7 +1044,18 @@ NS_ASSUME_NONNULL_BEGIN // Only send this group update to the requester. [message updateWithSendingToSingleGroupRecipient:envelope.source transaction:transaction]; - [self sendGroupUpdateForThread:gThread message:message]; + if (gThread.groupModel.groupImage) { + NSData *data = UIImagePNGRepresentation(gThread.groupModel.groupImage); + DataSource *_Nullable dataSource = [DataSourceValue dataSourceWithData:data fileExtension:@"png"]; + [self.messageSenderJobQueue addMediaMessage:message + dataSource:dataSource + contentType:OWSMimeTypeImagePng + sourceFilename:nil + isTemporaryAttachment:YES]; + + } else { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + } } - (TSIncomingMessage *_Nullable)handleReceivedEnvelope:(SSKProtoEnvelope *)envelope diff --git a/SignalServiceKit/src/Messages/OWSMessageSender.h b/SignalServiceKit/src/Messages/OWSMessageSender.h index 761bfa8fa..a7b182ebc 100644 --- a/SignalServiceKit/src/Messages/OWSMessageSender.h +++ b/SignalServiceKit/src/Messages/OWSMessageSender.h @@ -10,6 +10,7 @@ extern const NSUInteger kOversizeTextMessageSizeThreshold; @class OWSBlockingManager; @class OWSPrimaryStorage; +@class TSAttachmentStream; @class TSInvalidIdentityKeySendingErrorMessage; @class TSNetworkManager; @class TSOutgoingMessage; @@ -41,33 +42,49 @@ NS_SWIFT_NAME(MessageSender) /** * Send and resend text messages or resend messages with existing attachments. - * If you haven't yet created the attachment, see the ` enqueueAttachment:` variants. + * If you haven't yet created the attachment, see the `sendAttachment:` variants. */ -// TODO: make transaction nonnull and remove `sendMessage:success:failure` -- (void)enqueueMessage:(TSOutgoingMessage *)message - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler; +- (void)sendMessage:(TSOutgoingMessage *)message + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler; /** * Takes care of allocating and uploading the attachment, then sends the message. * Only necessary to call once. If sending fails, retry with `sendMessage:`. */ -- (void)enqueueAttachment:(DataSource *)dataSource - contentType:(NSString *)contentType - sourceFilename:(nullable NSString *)sourceFilename - inMessage:(TSOutgoingMessage *)outgoingMessage - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler; +- (void)sendAttachment:(DataSource *)dataSource + contentType:(NSString *)contentType + sourceFilename:(nullable NSString *)sourceFilename + inMessage:(TSOutgoingMessage *)outgoingMessage + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler; /** - * Same as ` enqueueAttachment:`, but deletes the local copy of the attachment after sending. + * Same as `sendAttachment:`, but deletes the local copy of the attachment after sending. * Used for sending sync request data, not for user visible attachments. */ -- (void)enqueueTemporaryAttachment:(DataSource *)dataSource - contentType:(NSString *)contentType - inMessage:(TSOutgoingMessage *)outgoingMessage - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler; +- (void)sendTemporaryAttachment:(DataSource *)dataSource + contentType:(NSString *)contentType + inMessage:(TSOutgoingMessage *)outgoingMessage + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler; + +@end + +@interface OutgoingMessagePreparer : NSObject + +/// Persists all necessary data to disk before sending, e.g. generate thumbnails ++ (void)prepareMessageForSending:(TSOutgoingMessage *)message + quotedThumbnailAttachments:(NSArray **)outQuotedThumbnailAttachments + contactShareAvatarAttachment:(TSAttachmentStream **)outContactShareAvatarAttachment + transaction:(YapDatabaseReadWriteTransaction *)transaction; + +/// Writes attachment to disk and applies original filename to message attributes ++ (void)prepareAttachmentWithDataSource:(DataSource *)dataSource + contentType:(NSString *)contentType + sourceFilename:(nullable NSString *)sourceFilename + inMessage:(TSOutgoingMessage *)outgoingMessage + completionHandler:(void (^)(NSError *_Nullable error))completionHandler; @end diff --git a/SignalServiceKit/src/Messages/OWSMessageSender.m b/SignalServiceKit/src/Messages/OWSMessageSender.m index 4689caec2..7958744a1 100644 --- a/SignalServiceKit/src/Messages/OWSMessageSender.m +++ b/SignalServiceKit/src/Messages/OWSMessageSender.m @@ -135,7 +135,6 @@ void AssertIsOnSendingQueue() return self; } - self.remainingRetries = 6; _message = message; _messageSender = messageSender; _dbConnection = dbConnection; @@ -201,8 +200,6 @@ void AssertIsOnSendingQueue() - (void)didFailWithError:(NSError *)error { - [self.message updateWithSendingError:error]; - OWSLogDebug(@"failed with error: %@", error); self.failureHandler(error); } @@ -314,9 +311,9 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; } } -- (void)enqueueMessage:(TSOutgoingMessage *)message - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler +- (void)sendMessage:(TSOutgoingMessage *)message + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler { OWSAssertDebug(message); if (message.body.length > 0) { @@ -341,24 +338,10 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; // So we're using YDB behavior to ensure this invariant, which is a bit // unorthodox. [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - if (message.quotedMessage) { - quotedThumbnailAttachments = - [message.quotedMessage createThumbnailAttachmentsIfNecessaryWithTransaction:transaction]; - } - - if (message.contactShare.avatarAttachmentId != nil) { - TSAttachment *avatarAttachment = [message.contactShare avatarAttachmentWithTransaction:transaction]; - if ([avatarAttachment isKindOfClass:[TSAttachmentStream class]]) { - contactShareAvatarAttachment = (TSAttachmentStream *)avatarAttachment; - } else { - OWSFailDebug(@"unexpected avatarAttachment: %@", avatarAttachment); - } - } - - // All outgoing messages should be saved at the time they are enqueued. - [message saveWithTransaction:transaction]; - // When we start a message send, all "failed" recipients should be marked as "sending". - [message updateWithMarkingAllUnsentRecipientsAsSendingWithTransaction:transaction]; + [OutgoingMessagePreparer prepareMessageForSending:message + quotedThumbnailAttachments:"edThumbnailAttachments + contactShareAvatarAttachment:&contactShareAvatarAttachment + transaction:transaction]; }]; NSOperationQueue *sendingQueue = [self sendingQueueForMessage:message]; @@ -409,11 +392,11 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; }); } -- (void)enqueueTemporaryAttachment:(DataSource *)dataSource - contentType:(NSString *)contentType - inMessage:(TSOutgoingMessage *)message - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler +- (void)sendTemporaryAttachment:(DataSource *)dataSource + contentType:(NSString *)contentType + inMessage:(TSOutgoingMessage *)message + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler { OWSAssertDebug(dataSource); @@ -431,46 +414,33 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; [message remove]; }; - [self enqueueAttachment:dataSource - contentType:contentType - sourceFilename:nil - inMessage:message - success:successWithDeleteHandler - failure:failureWithDeleteHandler]; + [self sendAttachment:dataSource + contentType:contentType + sourceFilename:nil + inMessage:message + success:successWithDeleteHandler + failure:failureWithDeleteHandler]; } -- (void)enqueueAttachment:(DataSource *)dataSource - contentType:(NSString *)contentType - sourceFilename:(nullable NSString *)sourceFilename - inMessage:(TSOutgoingMessage *)message - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler +- (void)sendAttachment:(DataSource *)dataSource + contentType:(NSString *)contentType + sourceFilename:(nullable NSString *)sourceFilename + inMessage:(TSOutgoingMessage *)message + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler { OWSAssertDebug(dataSource); - - dispatch_async([OWSDispatch attachmentsQueue], ^{ - TSAttachmentStream *attachmentStream = - [[TSAttachmentStream alloc] initWithContentType:contentType - byteCount:(UInt32)dataSource.dataLength - sourceFilename:sourceFilename]; - if (message.isVoiceMessage) { - attachmentStream.attachmentType = TSAttachmentTypeVoiceMessage; - } - - if (![attachmentStream writeDataSource:dataSource]) { - OWSProdError([OWSAnalyticsEvents messageSenderErrorCouldNotWriteAttachment]); - NSError *error = OWSErrorMakeWriteAttachmentDataError(); - return failureHandler(error); - } - - [attachmentStream save]; - [message.attachmentIds addObject:attachmentStream.uniqueId]; - if (sourceFilename) { - message.attachmentFilenameMap[attachmentStream.uniqueId] = sourceFilename; - } - - [self enqueueMessage:message success:successHandler failure:failureHandler]; - }); + [OutgoingMessagePreparer prepareAttachmentWithDataSource:dataSource + contentType:contentType + sourceFilename:sourceFilename + inMessage:message + completionHandler:^(NSError *_Nullable error) { + if (error) { + failureHandler(error); + return; + } + [self sendMessage:message success:successHandler failure:failureHandler]; + }]; } - (void)sendMessageToService:(TSOutgoingMessage *)message @@ -866,16 +836,6 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; return nil; } - if (messageSend.remainingAttempts == 0) { - OWSLogWarn(@"Terminal failure to build any device messages. Giving up with exception: %@", exception); - NSError *error = OWSErrorMakeFailedToSendOutgoingMessageError(); - // Since we've already repeatedly failed to build messages, it's unlikely that repeating the whole process - // will succeed. - [error setIsRetryable:NO]; - *errorHandle = error; - return nil; - } - OWSLogWarn(@"Could not build device messages: %@", exception); NSError *error = OWSErrorMakeFailedToSendOutgoingMessageError(); [error setIsRetryable:YES]; @@ -1149,9 +1109,6 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; void (^retrySend)(void) = ^void() { if (messageSend.remainingAttempts <= 0) { - // Since we've already repeatedly failed to send to the messaging API, - // it's unlikely that repeating the whole process will succeed. - [responseError setIsRetryable:NO]; return messageSend.failure(responseError); } @@ -1702,4 +1659,79 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; @end +@implementation OutgoingMessagePreparer + +#pragma mark - Dependencies + ++ (YapDatabaseConnection *)dbConnection +{ + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection; +} + +#pragma mark - + ++ (void)prepareMessageForSending:(TSOutgoingMessage *)message + quotedThumbnailAttachments:(NSArray **)outQuotedThumbnailAttachments + contactShareAvatarAttachment:(TSAttachmentStream *_Nullable *)outContactShareAvatarAttachment + transaction:(YapDatabaseReadWriteTransaction *)transaction +{ + if (message.quotedMessage) { + *outQuotedThumbnailAttachments = + [message.quotedMessage createThumbnailAttachmentsIfNecessaryWithTransaction:transaction]; + } + + if (message.contactShare.avatarAttachmentId != nil) { + TSAttachment *avatarAttachment = [message.contactShare avatarAttachmentWithTransaction:transaction]; + if ([avatarAttachment isKindOfClass:[TSAttachmentStream class]]) { + *outContactShareAvatarAttachment = (TSAttachmentStream *)avatarAttachment; + } else { + OWSFailDebug(@"unexpected avatarAttachment: %@", avatarAttachment); + } + } + + // All outgoing messages should be saved at the time they are enqueued. + [message saveWithTransaction:transaction]; + // When we start a message send, all "failed" recipients should be marked as "sending". + [message updateWithMarkingAllUnsentRecipientsAsSendingWithTransaction:transaction]; +} + ++ (void)prepareAttachmentWithDataSource:(DataSource *)dataSource + contentType:(NSString *)contentType + sourceFilename:(nullable NSString *)sourceFilename + inMessage:(TSOutgoingMessage *)outgoingMessage + completionHandler:(void (^)(NSError *_Nullable error))completionHandler +{ + OWSAssertDebug(dataSource); + + dispatch_async([OWSDispatch attachmentsQueue], ^{ + TSAttachmentStream *attachmentStream = + [[TSAttachmentStream alloc] initWithContentType:contentType + byteCount:(UInt32)dataSource.dataLength + sourceFilename:sourceFilename]; + if (outgoingMessage.isVoiceMessage) { + attachmentStream.attachmentType = TSAttachmentTypeVoiceMessage; + } + + if (![attachmentStream writeDataSource:dataSource]) { + OWSProdError([OWSAnalyticsEvents messageSenderErrorCouldNotWriteAttachment]); + NSError *error = OWSErrorMakeWriteAttachmentDataError(); + completionHandler(error); + } + + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [attachmentStream saveWithTransaction:transaction]; + + [outgoingMessage.attachmentIds addObject:attachmentStream.uniqueId]; + if (sourceFilename) { + outgoingMessage.attachmentFilenameMap[attachmentStream.uniqueId] = sourceFilename; + } + [outgoingMessage saveWithTransaction:transaction]; + }]; + + completionHandler(nil); + }); +} + +@end + NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.h b/SignalServiceKit/src/Messages/OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.h deleted file mode 100644 index 276f833fe..000000000 --- a/SignalServiceKit/src/Messages/OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.h +++ /dev/null @@ -1,25 +0,0 @@ -// Created by Michael Kirk on 9/25/16. -// Copyright © 2016 Open Whisper Systems. All rights reserved. - -NS_ASSUME_NONNULL_BEGIN - -@class OWSDisappearingMessagesConfiguration; -@class OWSMessageSender; -@class TSThread; - -@interface OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob : NSObject - -- (instancetype)init NS_UNAVAILABLE; -- (instancetype)initWithConfiguration:(OWSDisappearingMessagesConfiguration *)configuration - thread:(TSThread *)thread - messageSender:(OWSMessageSender *)messageSender NS_DESIGNATED_INITIALIZER; - -+ (void)runWithConfiguration:(OWSDisappearingMessagesConfiguration *)configuration - thread:(TSThread *)thread - messageSender:(OWSMessageSender *)messageSender; - -- (void)run; - -@end - -NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.m b/SignalServiceKit/src/Messages/OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.m deleted file mode 100644 index 59b803b09..000000000 --- a/SignalServiceKit/src/Messages/OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.m +++ /dev/null @@ -1,64 +0,0 @@ -// -// Copyright (c) 2018 Open Whisper Systems. All rights reserved. -// - -#import "OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob.h" -#import "OWSDisappearingMessagesConfigurationMessage.h" -#import "OWSMessageSender.h" - -NS_ASSUME_NONNULL_BEGIN - -@interface OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob () - -@property (nonatomic, readonly) OWSDisappearingMessagesConfiguration *configuration; -@property (nonatomic, readonly) OWSMessageSender *messageSender; -@property (nonatomic, readonly) TSThread *thread; - -@end - -@implementation OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob - -- (instancetype)initWithConfiguration:(OWSDisappearingMessagesConfiguration *)configuration - thread:(TSThread *)thread - messageSender:(OWSMessageSender *)messageSender -{ - self = [super init]; - if (!self) { - return self; - } - - _thread = thread; - _configuration = configuration; - _messageSender = messageSender; - - return self; -} - -+ (void)runWithConfiguration:(OWSDisappearingMessagesConfiguration *)configuration - thread:(TSThread *)thread - messageSender:(OWSMessageSender *)messageSender -{ - OWSNotifyRemoteOfUpdatedDisappearingConfigurationJob *job = - [[self alloc] initWithConfiguration:configuration thread:thread messageSender:messageSender]; - [job run]; -} - -- (void)run -{ - OWSDisappearingMessagesConfigurationMessage *message = - [[OWSDisappearingMessagesConfigurationMessage alloc] initWithConfiguration:self.configuration - thread:self.thread]; - - [self.messageSender enqueueMessage:message - success:^{ - OWSLogDebug(@"Successfully notified %@ of new disappearing messages configuration", self.thread); - } - failure:^(NSError *error) { - OWSLogError( - @"Failed to notify %@ of new disappearing messages configuration with error: %@", self.thread, error); - }]; -} - -@end - -NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Messages/OWSOutgoingReceiptManager.m b/SignalServiceKit/src/Messages/OWSOutgoingReceiptManager.m index fd19ec3b9..c81b161ef 100644 --- a/SignalServiceKit/src/Messages/OWSOutgoingReceiptManager.m +++ b/SignalServiceKit/src/Messages/OWSOutgoingReceiptManager.m @@ -189,11 +189,13 @@ NSString *const kOutgoingReadReceiptManagerCollection = @"kOutgoingReadReceiptMa } AnyPromise *sendPromise = [AnyPromise promiseWithResolverBlock:^(PMKResolver resolve) { - [self.messageSender enqueueMessage:message + [self.messageSender sendMessage:message success:^{ OWSLogInfo( @"Successfully sent %lu %@ receipts to sender.", (unsigned long)timestamps.count, receiptName); + // DURABLE CLEANUP - we could replace the custom durability logic in this class + // with a durable JobQueue. [self dequeueReceiptsWithRecipientId:recipientId timestamps:timestamps receiptType:receiptType]; // The value doesn't matter, we just need any non-NSError value. diff --git a/SignalServiceKit/src/Messages/OWSReadReceiptManager.m b/SignalServiceKit/src/Messages/OWSReadReceiptManager.m index f64ebd4a9..ce42b8962 100644 --- a/SignalServiceKit/src/Messages/OWSReadReceiptManager.m +++ b/SignalServiceKit/src/Messages/OWSReadReceiptManager.m @@ -171,11 +171,9 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE #pragma mark - Dependencies -- (OWSMessageSender *)messageSender +- (SSKMessageSenderJobQueue *)messageSenderJobQueue { - OWSAssertDebug(SSKEnvironment.shared.messageSender); - - return SSKEnvironment.shared.messageSender; + return SSKEnvironment.shared.messageSenderJobQueue; } - (OWSOutgoingReceiptManager *)outgoingReceiptManager @@ -219,14 +217,9 @@ NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsE OWSReadReceiptsForLinkedDevicesMessage *message = [[OWSReadReceiptsForLinkedDevicesMessage alloc] initWithReadReceipts:readReceiptsForLinkedDevices]; - [self.messageSender enqueueMessage:message - success:^{ - OWSLogInfo(@"Successfully sent %lu read receipt to linked devices.", - (unsigned long)readReceiptsForLinkedDevices.count); - } - failure:^(NSError *error) { - OWSLogError(@"Failed to send read receipt to linked devices with error: %@", error); - }]; + [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { + [self.messageSenderJobQueue addMessage:message transaction:transaction]; + }]; } BOOL didWork = readReceiptsForLinkedDevices.count > 0; diff --git a/SignalServiceKit/src/Network/MessageSenderJobQueue.swift b/SignalServiceKit/src/Network/MessageSenderJobQueue.swift new file mode 100644 index 000000000..ac745c60b --- /dev/null +++ b/SignalServiceKit/src/Network/MessageSenderJobQueue.swift @@ -0,0 +1,205 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import Foundation + +public extension Error { + var isRetryable: Bool { + return (self as NSError).isRetryable + } +} + +@objc(SSKMessageSenderJobQueue) +public class MessageSenderJobQueue: NSObject, JobQueue { + + // MARK: + + @objc(addMessage:transaction:) + public func add(message: TSOutgoingMessage, transaction: YapDatabaseReadWriteTransaction) { + self.add(message: message, removeMessageAfterSending: false, transaction: transaction) + } + + @objc(addMediaMessage:dataSource:contentType:sourceFilename:isTemporaryAttachment:) + public func add(mediaMessage: TSOutgoingMessage, dataSource: DataSource, contentType: String, sourceFilename: String?, isTemporaryAttachment: Bool) { + OutgoingMessagePreparer.prepareAttachment(with: dataSource, + contentType: contentType, + sourceFilename: sourceFilename, + in: mediaMessage) { error in + if let error = error { + self.dbConnection.readWrite { transaction in + mediaMessage.update(sendingError: error, transaction: transaction) + } + } else { + self.dbConnection.readWrite { transaction in + self.add(message: mediaMessage, removeMessageAfterSending: isTemporaryAttachment, transaction: transaction) + } + } + } + } + + private func add(message: TSOutgoingMessage, removeMessageAfterSending: Bool, transaction: YapDatabaseReadWriteTransaction) { + let jobRecord: SSKMessageSenderJobRecord + do { + jobRecord = try SSKMessageSenderJobRecord(message: message, removeMessageAfterSending: false, label: self.jobRecordLabel) + } catch { + owsFailDebug("failed to build job: \(error)") + return + } + self.add(jobRecord: jobRecord, transaction: transaction) + } + + // MARK: JobQueue + + public typealias DurableOperationType = MessageSenderOperation + public static let jobRecordLabel: String = "MessageSender" + public static let maxRetries: UInt = 10 + + public var jobRecordLabel: String { + return type(of: self).jobRecordLabel + } + + @objc + public func setup() { + defaultSetup() + } + + @objc + public var isReady: Bool = false { + didSet { + if isReady { + DispatchQueue.global().async { + self.workStep() + } + } + } + } + + public func didMarkAsReady(oldJobRecord: SSKMessageSenderJobRecord, transaction: YapDatabaseReadWriteTransaction) { + if let messageId = oldJobRecord.messageId, let message = TSOutgoingMessage.fetch(uniqueId: messageId, transaction: transaction) { + message.updateWithMarkingAllUnsentRecipientsAsSending(with: transaction) + } + } + + public func buildOperation(jobRecord: SSKMessageSenderJobRecord, transaction: YapDatabaseReadTransaction) throws -> MessageSenderOperation { + let message: TSOutgoingMessage + if let invisibleMessage = jobRecord.invisibleMessage { + message = invisibleMessage + } else if let messageId = jobRecord.messageId, let fetchedMessage = TSOutgoingMessage.fetch(uniqueId: messageId, transaction: transaction) { + message = fetchedMessage + } else { + assert(jobRecord.messageId != nil) + throw JobError.obsolete(description: "message no longer exists") + } + + return MessageSenderOperation(message: message, jobRecord: jobRecord) + } + + var senderQueues: [String: OperationQueue] = [:] + let defaultQueue: OperationQueue = { + let operationQueue = OperationQueue() + operationQueue.name = "DefaultSendingQueue" + operationQueue.maxConcurrentOperationCount = 1 + + return operationQueue + }() + + public func operationQueue(jobRecord: SSKMessageSenderJobRecord) -> OperationQueue { + guard let threadId = jobRecord.threadId else { + return defaultQueue + } + + guard let existingQueue = senderQueues[threadId] else { + let operationQueue = OperationQueue() + operationQueue.name = "SendingQueue:\(threadId)" + operationQueue.maxConcurrentOperationCount = 1 + + senderQueues[threadId] = operationQueue + + return operationQueue + } + + return existingQueue + } +} + +public class MessageSenderOperation: OWSOperation, DurableOperation { + + // MARK: DurableOperation + + public let jobRecord: SSKMessageSenderJobRecord + + weak public var durableOperationDelegate: MessageSenderJobQueue? + + public var operation: Operation { + return self + } + + // MARK: Init + + let message: TSOutgoingMessage + + init(message: TSOutgoingMessage, jobRecord: SSKMessageSenderJobRecord) { + self.message = message + self.jobRecord = jobRecord + super.init() + } + + // MARK: Dependencies + + var messageSender: MessageSender { + return SSKEnvironment.shared.messageSender + } + + var dbConnection: YapDatabaseConnection { + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection + } + + // MARK: OWSOperation + + override public func run() { + self.messageSender.send(message, success: reportSuccess, failure: reportError) + } + + override public func didSucceed() { + self.dbConnection.readWrite { transaction in + self.durableOperationDelegate?.durableOperationDidSucceed(self, transaction: transaction) + if self.jobRecord.removeMessageAfterSending { + self.message.remove(with: transaction) + } + } + } + + override public func didReportError(_ error: Error) { + Logger.debug("remainingRetries: \(self.remainingRetries)") + + self.dbConnection.readWrite { transaction in + self.durableOperationDelegate?.durableOperation(self, didReportError: error, transaction: transaction) + } + } + + override public func retryDelay() -> dispatch_time_t { + guard !CurrentAppContext().isRunningTests else { + return 0 + } + + // Arbitrary backoff factor... + // 10 failures, wait ~1min + let backoffFactor = 1.9 + let maxBackoff = kHourInterval + + let seconds = 0.1 * min(maxBackoff, pow(backoffFactor, Double(self.jobRecord.failureCount))) + return UInt64(seconds) * NSEC_PER_SEC + } + + override public func didFail(error: Error) { + self.dbConnection.readWrite { transaction in + self.durableOperationDelegate?.durableOperation(self, didFailWithError: error, transaction: transaction) + + self.message.update(sendingError: error, transaction: transaction) + if self.jobRecord.removeMessageAfterSending { + self.message.remove(with: transaction) + } + } + } +} diff --git a/SignalServiceKit/src/SSKEnvironment.h b/SignalServiceKit/src/SSKEnvironment.h index 131664c39..7e76a4302 100644 --- a/SignalServiceKit/src/SSKEnvironment.h +++ b/SignalServiceKit/src/SSKEnvironment.h @@ -20,6 +20,7 @@ NS_ASSUME_NONNULL_BEGIN @class OWSOutgoingReceiptManager; @class OWSPrimaryStorage; @class OWSReadReceiptManager; +@class SSKMessageSenderJobQueue; @class TSAccountManager; @class TSNetworkManager; @class TSSocketManager; @@ -30,11 +31,13 @@ NS_ASSUME_NONNULL_BEGIN @protocol OWSCallMessageHandler; @protocol ProfileManagerProtocol; @protocol OWSUDManager; +@protocol OWSSyncManagerProtocol; @interface SSKEnvironment : NSObject - (instancetype)initWithContactsManager:(id)contactsManager messageSender:(OWSMessageSender *)messageSender + messageSenderJobQueue:(SSKMessageSenderJobQueue *)messageSenderJobQueue profileManager:(id)profileManager primaryStorage:(OWSPrimaryStorage *)primaryStorage contactsUpdater:(ContactsUpdater *)contactsUpdater @@ -68,6 +71,7 @@ NS_ASSUME_NONNULL_BEGIN @property (nonatomic, readonly) id contactsManager; @property (nonatomic, readonly) OWSMessageSender *messageSender; +@property (nonatomic, readonly) SSKMessageSenderJobQueue *messageSenderJobQueue; @property (nonatomic, readonly) id profileManager; @property (nonatomic, readonly) OWSPrimaryStorage *primaryStorage; @property (nonatomic, readonly) ContactsUpdater *contactsUpdater; diff --git a/SignalServiceKit/src/SSKEnvironment.m b/SignalServiceKit/src/SSKEnvironment.m index a3f003d0b..07355297e 100644 --- a/SignalServiceKit/src/SSKEnvironment.m +++ b/SignalServiceKit/src/SSKEnvironment.m @@ -49,6 +49,7 @@ static SSKEnvironment *sharedSSKEnvironment; - (instancetype)initWithContactsManager:(id)contactsManager messageSender:(OWSMessageSender *)messageSender + messageSenderJobQueue:(SSKMessageSenderJobQueue *)messageSenderJobQueue profileManager:(id)profileManager primaryStorage:(OWSPrimaryStorage *)primaryStorage contactsUpdater:(ContactsUpdater *)contactsUpdater @@ -67,7 +68,8 @@ static SSKEnvironment *sharedSSKEnvironment; contactDiscoveryService:(ContactDiscoveryService *)contactDiscoveryService readReceiptManager:(OWSReadReceiptManager *)readReceiptManager outgoingReceiptManager:(OWSOutgoingReceiptManager *)outgoingReceiptManager - syncManager:(id)syncManager { + syncManager:(id)syncManager +{ self = [super init]; if (!self) { return self; @@ -75,6 +77,7 @@ static SSKEnvironment *sharedSSKEnvironment; OWSAssertDebug(contactsManager); OWSAssertDebug(messageSender); + OWSAssertDebug(messageSenderJobQueue); OWSAssertDebug(profileManager); OWSAssertDebug(primaryStorage); OWSAssertDebug(contactsUpdater); @@ -97,6 +100,7 @@ static SSKEnvironment *sharedSSKEnvironment; _contactsManager = contactsManager; _messageSender = messageSender; + _messageSenderJobQueue = messageSenderJobQueue; _profileManager = profileManager; _primaryStorage = primaryStorage; _contactsUpdater = contactsUpdater; diff --git a/SignalServiceKit/src/SignalServiceKit.h b/SignalServiceKit/src/SignalServiceKit.h index a702a7b22..eb4382c5f 100644 --- a/SignalServiceKit/src/SignalServiceKit.h +++ b/SignalServiceKit/src/SignalServiceKit.h @@ -3,6 +3,8 @@ // // Anything used by Swift outside of the framework must be imported. -#import "OWSFileSystem.h" -#import "OWSOperation.h" -#import "OWSSyncManagerProtocol.h" +#import +#import +#import +#import +#import diff --git a/SignalServiceKit/src/Storage/OWSPrimaryStorage.m b/SignalServiceKit/src/Storage/OWSPrimaryStorage.m index b7000f0f7..e68d263c0 100644 --- a/SignalServiceKit/src/Storage/OWSPrimaryStorage.m +++ b/SignalServiceKit/src/Storage/OWSPrimaryStorage.m @@ -212,6 +212,7 @@ void VerifyRegistrationsForPrimaryStorage(OWSStorage *storage) [OWSFailedAttachmentDownloadsJob asyncRegisterDatabaseExtensionsWithPrimaryStorage:self]; [OWSMediaGalleryFinder asyncRegisterDatabaseExtensionsWithPrimaryStorage:self]; [TSDatabaseView asyncRegisterLazyRestoreAttachmentsDatabaseView:self]; + [SSKJobRecordFinder asyncRegisterDatabaseExtensionObjCWithStorage:self]; [self.database flushExtensionRequestsWithCompletionQueue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) diff --git a/SignalServiceKit/src/Storage/SSKIncrementingIdFinder.swift b/SignalServiceKit/src/Storage/SSKIncrementingIdFinder.swift new file mode 100644 index 000000000..d8f06970c --- /dev/null +++ b/SignalServiceKit/src/Storage/SSKIncrementingIdFinder.swift @@ -0,0 +1,27 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import Foundation + +@objc +public class SSKIncrementingIdFinder: NSObject { + + private static let collectionName = "IncrementingIdCollection" + + @objc + public class func previousId(key: String, transaction: YapDatabaseReadTransaction) -> UInt64 { + let previousId: UInt64 = transaction.object(forKey: key, inCollection: collectionName) as? UInt64 ?? 0 + return previousId + } + + @objc + public class func nextId(key: String, transaction: YapDatabaseReadWriteTransaction) -> UInt64 { + let previousId: UInt64 = transaction.object(forKey: key, inCollection: collectionName) as? UInt64 ?? 0 + let nextId: UInt64 = previousId + 1 + + transaction.setObject(nextId, forKey: key, inCollection: collectionName) + Logger.debug("key: \(key) nextId: \(nextId)") + return nextId + } +} diff --git a/SignalServiceKit/src/Storage/SSKJobRecord.h b/SignalServiceKit/src/Storage/SSKJobRecord.h new file mode 100644 index 000000000..c14a04bca --- /dev/null +++ b/SignalServiceKit/src/Storage/SSKJobRecord.h @@ -0,0 +1,57 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +#import "TSYapDatabaseObject.h" + +NS_ASSUME_NONNULL_BEGIN + +extern NSErrorDomain const SSKJobRecordErrorDomain; + +typedef NS_ERROR_ENUM(SSKJobRecordErrorDomain, JobRecordError){ + JobRecordError_AssertionError = 100, + JobRecordError_IllegalStateTransition, +}; + +typedef NS_ENUM(NSUInteger, SSKJobRecordStatus) { + SSKJobRecordStatus_Unknown, + SSKJobRecordStatus_Ready, + SSKJobRecordStatus_Running, + SSKJobRecordStatus_PermanentlyFailed, + SSKJobRecordStatus_Obsolete +}; + +#pragma mark - + +@interface SSKJobRecord : TSYapDatabaseObject + +@property (nonatomic) NSUInteger failureCount; +@property (nonatomic) NSString *label; + +- (instancetype)initWithLabel:(NSString *)label NS_DESIGNATED_INITIALIZER; +- (nullable instancetype)initWithCoder:(NSCoder *)coder NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithUniqueId:(NSString *_Nullable)uniqueId NS_UNAVAILABLE; +- (instancetype)init NS_UNAVAILABLE; + +@property (readonly, nonatomic) SSKJobRecordStatus status; +@property (nonatomic, readonly) UInt64 sortId; + +- (BOOL)saveAsStartedWithTransaction:(YapDatabaseReadWriteTransaction *)transaction + error:(NSError **)outError NS_SWIFT_NAME(saveAsStarted(transaction:)); + +- (void)saveAsPermanentlyFailedWithTransaction:(YapDatabaseReadWriteTransaction *)transaction + NS_SWIFT_NAME(saveAsPermanentlyFailed(transaction:)); + +- (void)saveAsObsoleteWithTransaction:(YapDatabaseReadWriteTransaction *)transaction + NS_SWIFT_NAME(saveAsObsolete(transaction:)); + +- (BOOL)saveRunningAsReadyWithTransaction:(YapDatabaseReadWriteTransaction *)transaction + error:(NSError **)outError NS_SWIFT_NAME(saveRunningAsReady(transaction:)); + +- (BOOL)addFailureWithWithTransaction:(YapDatabaseReadWriteTransaction *)transaction + error:(NSError **)outError NS_SWIFT_NAME(addFailure(transaction:)); + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Storage/SSKJobRecord.m b/SignalServiceKit/src/Storage/SSKJobRecord.m new file mode 100644 index 000000000..d27714167 --- /dev/null +++ b/SignalServiceKit/src/Storage/SSKJobRecord.m @@ -0,0 +1,127 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +#import "SSKJobRecord.h" +#import + +NS_ASSUME_NONNULL_BEGIN + +NSErrorDomain const SSKJobRecordErrorDomain = @"SignalServiceKit.JobRecord"; + +#pragma mark - +@interface SSKJobRecord () + +@property (nonatomic) SSKJobRecordStatus status; +@property (nonatomic) UInt64 sortId; + +@end + +@implementation SSKJobRecord + +- (instancetype)initWithLabel:(NSString *)label +{ + self = [super init]; + if (!self) { + return self; + } + + _status = SSKJobRecordStatus_Ready; + _label = label; + + return self; +} + +- (nullable instancetype)initWithCoder:(NSCoder *)coder +{ + return [super initWithCoder:coder]; +} + +#pragma mark - TSYapDatabaseObject Overrides + ++ (NSString *)collection +{ + // To avoid a plethora of identical JobRecord subclasses, all job records share + // a common collection and JobQueue's distinguish their behavior by the job's + // `label` + return @"JobRecord"; +} + +- (void)saveWithTransaction:(YapDatabaseReadWriteTransaction *)transaction +{ + if (self.sortId == 0) { + self.sortId = [SSKIncrementingIdFinder nextIdWithKey:self.class.collection transaction:transaction]; + } + [super saveWithTransaction:transaction]; +} + +#pragma mark - + +- (BOOL)saveAsStartedWithTransaction:(YapDatabaseReadWriteTransaction *)transaction error:(NSError **)outError +{ + if (self.status != SSKJobRecordStatus_Ready) { + *outError = + [NSError errorWithDomain:SSKJobRecordErrorDomain code:JobRecordError_IllegalStateTransition userInfo:nil]; + return NO; + } + self.status = SSKJobRecordStatus_Running; + [self saveWithTransaction:transaction]; + + return YES; +} + +- (void)saveAsPermanentlyFailedWithTransaction:(YapDatabaseReadWriteTransaction *)transaction +{ + self.status = SSKJobRecordStatus_PermanentlyFailed; + [self saveWithTransaction:transaction]; +} + +- (void)saveAsObsoleteWithTransaction:(YapDatabaseReadWriteTransaction *)transaction +{ + self.status = SSKJobRecordStatus_Obsolete; + [self saveWithTransaction:transaction]; +} + +- (BOOL)saveRunningAsReadyWithTransaction:(YapDatabaseReadWriteTransaction *)transaction error:(NSError **)outError +{ + switch (self.status) { + case SSKJobRecordStatus_Running: { + self.status = SSKJobRecordStatus_Ready; + [self saveWithTransaction:transaction]; + return YES; + } + case SSKJobRecordStatus_Ready: + case SSKJobRecordStatus_PermanentlyFailed: + case SSKJobRecordStatus_Obsolete: + case SSKJobRecordStatus_Unknown: { + *outError = [NSError errorWithDomain:SSKJobRecordErrorDomain + code:JobRecordError_IllegalStateTransition + userInfo:nil]; + return NO; + } + } +} + +- (BOOL)addFailureWithWithTransaction:(YapDatabaseReadWriteTransaction *)transaction error:(NSError **)outError +{ + switch (self.status) { + case SSKJobRecordStatus_Running: { + self.failureCount++; + [self saveWithTransaction:transaction]; + return YES; + } + case SSKJobRecordStatus_Ready: + case SSKJobRecordStatus_PermanentlyFailed: + case SSKJobRecordStatus_Obsolete: + case SSKJobRecordStatus_Unknown: { + *outError = [NSError errorWithDomain:SSKJobRecordErrorDomain + code:JobRecordError_IllegalStateTransition + userInfo:nil]; + return NO; + } + } +} + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Storage/SSKMessageSenderJobRecord.h b/SignalServiceKit/src/Storage/SSKMessageSenderJobRecord.h new file mode 100644 index 000000000..039bbd9f8 --- /dev/null +++ b/SignalServiceKit/src/Storage/SSKMessageSenderJobRecord.h @@ -0,0 +1,29 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +#import "SSKJobRecord.h" + +NS_ASSUME_NONNULL_BEGIN + +@class TSOutgoingMessage; + +@interface SSKMessageSenderJobRecord : SSKJobRecord + +@property (nonatomic, readonly, nullable) NSString *messageId; +@property (nonatomic, readonly, nullable) NSString *threadId; +@property (nonatomic, readonly, nullable) TSOutgoingMessage *invisibleMessage; +@property (nonatomic, readonly) BOOL removeMessageAfterSending; + +- (nullable instancetype)initWithMessage:(TSOutgoingMessage *)message + removeMessageAfterSending:(BOOL)removeMessageAfterSending + label:(NSString *)label + error:(NSError **)outError NS_DESIGNATED_INITIALIZER; + +- (nullable instancetype)initWithCoder:(NSCoder *)coder NS_DESIGNATED_INITIALIZER; + +- (instancetype)initWithLabel:(nullable NSString *)label NS_UNAVAILABLE; + +@end + +NS_ASSUME_NONNULL_END diff --git a/SignalServiceKit/src/Storage/SSKMessageSenderJobRecord.m b/SignalServiceKit/src/Storage/SSKMessageSenderJobRecord.m new file mode 100644 index 000000000..c54357c02 --- /dev/null +++ b/SignalServiceKit/src/Storage/SSKMessageSenderJobRecord.m @@ -0,0 +1,47 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +#import "SSKMessageSenderJobRecord.h" +#import "TSOutgoingMessage.h" + +@implementation SSKMessageSenderJobRecord + +#pragma mark + +- (nullable instancetype)initWithCoder:(NSCoder *)coder +{ + return [super initWithCoder:coder]; +} + +- (nullable instancetype)initWithMessage:(TSOutgoingMessage *)message + removeMessageAfterSending:(BOOL)removeMessageAfterSending + label:(NSString *)label + error:(NSError **)outError; +{ + self = [super initWithLabel:label]; + if (!self) { + return self; + } + + if (message.shouldBeSaved) { + _messageId = message.uniqueId; + if (_messageId == nil) { + *outError = [NSError errorWithDomain:SSKJobRecordErrorDomain + code:JobRecordError_AssertionError + userInfo:@{ NSDebugDescriptionErrorKey : @"messageId wasn't set" }]; + return nil; + } + _invisibleMessage = nil; + } else { + _messageId = nil; + _invisibleMessage = message; + } + + _removeMessageAfterSending = removeMessageAfterSending; + _threadId = message.uniqueThreadId; + + return self; +} + +@end diff --git a/SignalServiceKit/src/Storage/TSYapDatabaseObject.h b/SignalServiceKit/src/Storage/TSYapDatabaseObject.h index 2c31c81a4..d4cf3950f 100644 --- a/SignalServiceKit/src/Storage/TSYapDatabaseObject.h +++ b/SignalServiceKit/src/Storage/TSYapDatabaseObject.h @@ -20,6 +20,7 @@ NS_ASSUME_NONNULL_BEGIN * * @return Initialized object */ +- (instancetype)init NS_DESIGNATED_INITIALIZER; - (instancetype)initWithUniqueId:(NSString *_Nullable)uniqueId NS_DESIGNATED_INITIALIZER; - (nullable instancetype)initWithCoder:(NSCoder *)coder NS_DESIGNATED_INITIALIZER; diff --git a/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m b/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m index c55a09885..65966b2f1 100644 --- a/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m +++ b/SignalServiceKit/src/TestUtils/MockSSKEnvironment.m @@ -54,6 +54,7 @@ NS_ASSUME_NONNULL_BEGIN id contactsManager = [OWSFakeContactsManager new]; TSNetworkManager *networkManager = [OWSFakeNetworkManager new]; OWSMessageSender *messageSender = [OWSFakeMessageSender new]; + SSKMessageSenderJobQueue *messageSenderJobQueue = [SSKMessageSenderJobQueue new]; OWSMessageManager *messageManager = [[OWSMessageManager alloc] initWithPrimaryStorage:primaryStorage]; OWSBlockingManager *blockingManager = [[OWSBlockingManager alloc] initWithPrimaryStorage:primaryStorage]; @@ -76,6 +77,7 @@ NS_ASSUME_NONNULL_BEGIN self = [super initWithContactsManager:contactsManager messageSender:messageSender + messageSenderJobQueue:messageSenderJobQueue profileManager:[OWSFakeProfileManager new] primaryStorage:primaryStorage contactsUpdater:[OWSFakeContactsUpdater new] diff --git a/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.h b/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.h index 00ce55670..bda37439f 100644 --- a/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.h +++ b/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.h @@ -8,11 +8,15 @@ NS_ASSUME_NONNULL_BEGIN #ifdef DEBUG +typedef void (^messageBlock)(TSOutgoingMessage *); + @interface OWSFakeMessageSender : OWSMessageSender -@property (nonatomic, nullable) dispatch_block_t enqueueMessageBlock; -@property (nonatomic, nullable) dispatch_block_t enqueueAttachmentBlock; -@property (nonatomic, nullable) dispatch_block_t enqueueTemporaryAttachmentBlock; +@property (nonatomic, nullable) NSError *stubbedFailingError; + +@property (nonatomic, nullable) messageBlock sendMessageWasCalledBlock; +@property (nonatomic, nullable) messageBlock sendAttachmentWasCalledBlock; +@property (nonatomic, nullable) messageBlock sendTemporaryAttachmentWasCalledBlock; @end diff --git a/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.m b/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.m index 6b5ebca13..549e1783e 100644 --- a/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.m +++ b/SignalServiceKit/src/TestUtils/OWSFakeMessageSender.m @@ -10,39 +10,54 @@ NS_ASSUME_NONNULL_BEGIN @implementation OWSFakeMessageSender -- (void)enqueueMessage:(TSOutgoingMessage *)message +- (void)sendMessage:(TSOutgoingMessage *)message + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler +{ + if (self.sendMessageWasCalledBlock) { + self.sendMessageWasCalledBlock(message); + } + + if (self.stubbedFailingError) { + failureHandler(self.stubbedFailingError); + } else { + successHandler(); + } +} + +- (void)sendAttachment:(DataSource *)dataSource + contentType:(NSString *)contentType + sourceFilename:(nullable NSString *)sourceFilename + inMessage:(TSOutgoingMessage *)outgoingMessage success:(void (^)(void))successHandler failure:(void (^)(NSError *error))failureHandler { - if (self.enqueueMessageBlock) { - self.enqueueMessageBlock(); + if (self.sendAttachmentWasCalledBlock) { + self.sendAttachmentWasCalledBlock(outgoingMessage); + } + + if (self.stubbedFailingError) { + failureHandler(self.stubbedFailingError); + } else { + successHandler(); } - successHandler(); } -- (void)enqueueAttachment:(DataSource *)dataSource - contentType:(NSString *)contentType - sourceFilename:(nullable NSString *)sourceFilename - inMessage:(TSOutgoingMessage *)outgoingMessage - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler +- (void)sendTemporaryAttachment:(DataSource *)dataSource + contentType:(NSString *)contentType + inMessage:(TSOutgoingMessage *)outgoingMessage + success:(void (^)(void))successHandler + failure:(void (^)(NSError *error))failureHandler { - if (self.enqueueAttachmentBlock) { - self.enqueueAttachmentBlock(); + if (self.sendTemporaryAttachmentWasCalledBlock) { + self.sendTemporaryAttachmentWasCalledBlock(outgoingMessage); } - successHandler(); -} -- (void)enqueueTemporaryAttachment:(DataSource *)dataSource - contentType:(NSString *)contentType - inMessage:(TSOutgoingMessage *)outgoingMessage - success:(void (^)(void))successHandler - failure:(void (^)(NSError *error))failureHandler -{ - if (self.enqueueTemporaryAttachmentBlock) { - self.enqueueTemporaryAttachmentBlock(); + if (self.stubbedFailingError) { + failureHandler(self.stubbedFailingError); + } else { + successHandler(); } - successHandler(); } diff --git a/SignalServiceKit/src/Util/JobQueue.swift b/SignalServiceKit/src/Util/JobQueue.swift new file mode 100644 index 000000000..42325b3a8 --- /dev/null +++ b/SignalServiceKit/src/Util/JobQueue.swift @@ -0,0 +1,320 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import Foundation + +/// JobQueue - A durable work queue +/// +/// When work needs to be done, add it to the JobQueue. +/// The JobQueue will persist a JobRecord to be sure that work can be restarted if the app is killed. +/// +/// The actual work, is carried out in an operation which the JobQueue spins off, based on the contents +/// of a JobRecord. +/// +/// For a concrete example, adding a message to MessageSenderJobQueue, first records a SSKMessageSenderJobRecord. +/// The MessageSenderJobQueue can use that SSKMessageSenderJobRecord to create a MessageSenderOperation which +/// takes care of the actual business of communicating with the service. + +public enum JobError: Error { + case assertionFailure(description: String) + case obsolete(description: String) +} + +public protocol DurableOperation: class { + associatedtype JobRecordType: SSKJobRecord + associatedtype DurableOperationDelegateType: DurableOperationDelegate + + var jobRecord: JobRecordType { get } + var durableOperationDelegate: DurableOperationDelegateType? { get set } + var operation: Operation { get } + var remainingRetries: UInt { get set } +} + +public protocol DurableOperationDelegate: class { + associatedtype DurableOperationType: DurableOperation + + func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) + func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction) + func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) +} + +public protocol JobQueue: DurableOperationDelegate { + typealias DurableOperationDelegateType = Self + typealias JobRecordType = DurableOperationType.JobRecordType + + // MARK: Dependencies + + var dbConnection: YapDatabaseConnection { get } + var finder: JobRecordFinder { get } + + // MARK: Default Implementations + + func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) + func restartOldJobs() + func workStep() + func defaultSetup() + + // MARK: Required + + var jobRecordLabel: String { get } + + var isReady: Bool { get set } + func setup() + func didMarkAsReady(oldJobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) + + func operationQueue(jobRecord: JobRecordType) -> OperationQueue + func buildOperation(jobRecord: JobRecordType, transaction: YapDatabaseReadTransaction) throws -> DurableOperationType + + static var maxRetries: UInt { get } +} + +public extension JobQueue { + + // MARK: Depenencies + + var dbConnection: YapDatabaseConnection { + return SSKEnvironment.shared.primaryStorage.dbReadWriteConnection + } + + var finder: JobRecordFinder { + return JobRecordFinder() + } + + // MARK: + + func add(jobRecord: JobRecordType, transaction: YapDatabaseReadWriteTransaction) { + assert(jobRecord.status == .ready) + jobRecord.save(with: transaction) + + transaction.addCompletionQueue(.global()) { + self.workStep() + } + } + + func workStep() { + Logger.debug("") + + guard isReady else { + if !CurrentAppContext().isRunningTests { + owsFailDebug("not ready") + } + + Logger.error("not ready") + return + } + + self.dbConnection.readWrite { transaction in + guard let nextJob: JobRecordType = self.finder.getNextReady(label: self.jobRecordLabel, transaction: transaction) as? JobRecordType else { + Logger.verbose("nothing left to enqueue") + return + } + + do { + try nextJob.saveAsStarted(transaction: transaction) + + let operationQueue = self.operationQueue(jobRecord: nextJob) + let durableOperation = try self.buildOperation(jobRecord: nextJob, transaction: transaction) + + durableOperation.durableOperationDelegate = self as? Self.DurableOperationType.DurableOperationDelegateType + assert(durableOperation.durableOperationDelegate != nil) + + let remainingRetries = self.remainingRetries(durableOperation: durableOperation) + durableOperation.remainingRetries = remainingRetries + + Logger.debug("adding operation: \(durableOperation) with remainingRetries: \(remainingRetries)") + operationQueue.addOperation(durableOperation.operation) + } catch JobError.assertionFailure(let description) { + owsFailDebug("assertion failure: \(description)") + nextJob.saveAsPermanentlyFailed(transaction: transaction) + } catch JobError.obsolete(let description) { + // TODO is this even worthwhile to have obsolete state? Should we just delete the task outright? + Logger.verbose("marking obsolete task as such. description:\(description)") + nextJob.saveAsObsolete(transaction: transaction) + } catch { + owsFailDebug("unexpected error") + } + + DispatchQueue.global().async { + self.workStep() + } + } + } + + public func restartOldJobs() { + self.dbConnection.readWrite { transaction in + let runningRecords = self.finder.allRecords(label: self.jobRecordLabel, status: .running, transaction: transaction) + Logger.info("marking old `running` JobRecords as ready: \(runningRecords.count)") + for record in runningRecords { + guard let jobRecord = record as? JobRecordType else { + owsFailDebug("unexpectred jobRecord: \(record)") + continue + } + do { + try jobRecord.saveRunningAsReady(transaction: transaction) + self.didMarkAsReady(oldJobRecord: jobRecord, transaction: transaction) + } catch { + owsFailDebug("failed to mark old running records as ready error: \(error)") + jobRecord.saveAsPermanentlyFailed(transaction: transaction) + } + } + } + } + + /// Unless you need special handling, your setup method can be as simple as + /// + /// func setup() { + /// defaultSetup() + /// } + /// + /// So you might ask, why not just rename this method to `setup`? Because + /// `setup` is called from objc, and default implementations from a protocol + /// cannot be marked as @objc. + func defaultSetup() { + guard !isReady else { + owsFailDebug("already ready already") + return + } + self.restartOldJobs() + + self.isReady = true + } + + func remainingRetries(durableOperation: DurableOperationType) -> UInt { + let maxRetries = type(of: self).maxRetries + let failureCount = durableOperation.jobRecord.failureCount + + guard maxRetries > failureCount else { + return 0 + } + + return maxRetries - failureCount + } + + // MARK: DurableOperationDelegate + + func durableOperationDidSucceed(_ operation: DurableOperationType, transaction: YapDatabaseReadWriteTransaction) { + operation.jobRecord.remove(with: transaction) + } + + func durableOperation(_ operation: DurableOperationType, didReportError: Error, transaction: YapDatabaseReadWriteTransaction) { + do { + try operation.jobRecord.addFailure(transaction: transaction) + } catch { + owsFailDebug("error while addingFailure: \(error)") + operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction) + } + } + + func durableOperation(_ operation: DurableOperationType, didFailWithError error: Error, transaction: YapDatabaseReadWriteTransaction) { + operation.jobRecord.saveAsPermanentlyFailed(transaction: transaction) + } +} + +@objc(SSKJobRecordFinder) +public class JobRecordFinder: NSObject, Finder { + + typealias ExtensionType = YapDatabaseSecondaryIndex + typealias TransactionType = YapDatabaseSecondaryIndexTransaction + + enum JobRecordField: String { + case status, label, sortId + } + + func getNextReady(label: String, transaction: YapDatabaseReadTransaction) -> SSKJobRecord? { + var result: SSKJobRecord? + self.enumerateJobRecords(label: label, status: .ready, transaction: transaction) { jobRecord, stopPointer in + result = jobRecord + stopPointer.pointee = true + } + return result + } + + func allRecords(label: String, status: SSKJobRecordStatus, transaction: YapDatabaseReadTransaction) -> [SSKJobRecord] { + var result: [SSKJobRecord] = [] + self.enumerateJobRecords(label: label, status: status, transaction: transaction) { jobRecord, stopPointer in + result.append(jobRecord) + } + return result + } + + func enumerateJobRecords(label: String, status: SSKJobRecordStatus, transaction: YapDatabaseReadTransaction, block: @escaping (SSKJobRecord, UnsafeMutablePointer) -> Void) { + let queryFormat = String(format: "WHERE %@ = ? AND %@ = ? ORDER BY %@", JobRecordField.status.rawValue, JobRecordField.label.rawValue, JobRecordField.sortId.rawValue) + let query = YapDatabaseQuery(string: queryFormat, parameters: [status.rawValue, label]) + + self.ext(transaction: transaction).enumerateKeysAndObjects(matching: query) { collection, key, object, stopPointer in + guard let jobRecord = object as? SSKJobRecord else { + owsFailDebug("expecting jobRecord but found: \(object)") + return + } + block(jobRecord, stopPointer) + } + } + + static var dbExtensionName: String { + return "SecondaryIndexJobRecord" + } + + @objc + public class func asyncRegisterDatabaseExtensionObjC(storage: OWSStorage) { + asyncRegisterDatabaseExtension(storage: storage) + } + + static var dbExtensionConfig: YapDatabaseSecondaryIndex { + let setup = YapDatabaseSecondaryIndexSetup() + setup.addColumn(JobRecordField.sortId.rawValue, with: .integer) + setup.addColumn(JobRecordField.status.rawValue, with: .integer) + setup.addColumn(JobRecordField.label.rawValue, with: .text) + + let block: YapDatabaseSecondaryIndexWithObjectBlock = { transaction, dict, collection, key, object in + guard let jobRecord = object as? SSKJobRecord else { + return + } + + dict[JobRecordField.sortId.rawValue] = jobRecord.sortId + dict[JobRecordField.status.rawValue] = jobRecord.status.rawValue + dict[JobRecordField.label.rawValue] = jobRecord.label + } + + let handler = YapDatabaseSecondaryIndexHandler.withObjectBlock(block) + + let options = YapDatabaseSecondaryIndexOptions() + let whitelist = YapWhitelistBlacklist(whitelist: Set([SSKJobRecord.collection()])) + options.allowedCollections = whitelist + + return YapDatabaseSecondaryIndex.init(setup: setup, handler: handler, versionTag: "2", options: options) + } +} + +protocol Finder { + associatedtype ExtensionType: YapDatabaseExtension + associatedtype TransactionType: YapDatabaseExtensionTransaction + + static var dbExtensionName: String { get } + static var dbExtensionConfig: ExtensionType { get } + + func ext(transaction: YapDatabaseReadTransaction) -> TransactionType + + static func asyncRegisterDatabaseExtension(storage: OWSStorage) + static func testingOnly_ensureDatabaseExtensionRegistered(storage: OWSStorage) +} + +extension Finder { + + func ext(transaction: YapDatabaseReadTransaction) -> TransactionType { + return transaction.ext(type(of: self).dbExtensionName) as! TransactionType + } + + static func asyncRegisterDatabaseExtension(storage: OWSStorage) { + storage.asyncRegister(dbExtensionConfig, withName: dbExtensionName) + } + + // Only for testing. + static func testingOnly_ensureDatabaseExtensionRegistered(storage: OWSStorage) { + guard storage.registeredExtension(dbExtensionName) == nil else { + return + } + + storage.register(dbExtensionConfig, withName: dbExtensionName) + } +} diff --git a/SignalServiceKit/src/Util/OWSOperation.h b/SignalServiceKit/src/Util/OWSOperation.h index 55e30bed0..463a7e3cb 100644 --- a/SignalServiceKit/src/Util/OWSOperation.h +++ b/SignalServiceKit/src/Util/OWSOperation.h @@ -45,9 +45,15 @@ typedef NS_ENUM(NSInteger, OWSOperationState) { // Called at most one time. - (void)didCancel; +// Called zero or more times, retry may be possible +- (void)didReportError:(NSError *)error; + // Called at most one time, once retry is no longer possible. - (void)didFailWithError:(NSError *)error NS_SWIFT_NAME(didFail(error:)); +// How long to wait before retry, if possible +- (dispatch_time_t)retryDelay; + #pragma mark - Success/Error - Do Not Override // Report that the operation completed successfully. diff --git a/SignalServiceKit/src/Util/OWSOperation.m b/SignalServiceKit/src/Util/OWSOperation.m index e416b7f26..6e3da8a92 100644 --- a/SignalServiceKit/src/Util/OWSOperation.m +++ b/SignalServiceKit/src/Util/OWSOperation.m @@ -91,6 +91,13 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; // Override in subclass if necessary } +// Called zero or more times, retry may be possible +- (void)didReportError:(NSError *)error +{ + // no-op + // Override in subclass if necessary +} + // Called at most one time, once retry is no longer possible. - (void)didFailWithError:(NSError *)error { @@ -144,6 +151,8 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; error.isRetryable, (unsigned long)self.remainingRetries); + [self didReportError:error]; + if (error.isFatal) { [self failOperationWithError:error]; return; @@ -161,13 +170,17 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished"; self.remainingRetries--; - // TODO Do we want some kind of exponential backoff? - // I'm not sure that there is a one-size-fits all backoff approach - dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, self.retryDelay), dispatch_get_main_queue(), ^{ [self run]; }); } +// Override in subclass if you want something more sophisticated, e.g. exponential backoff +- (dispatch_time_t)retryDelay +{ + return (0.1 * NSEC_PER_SEC); +} + #pragma mark - Life Cycle - (void)failOperationWithError:(NSError *)error diff --git a/SignalServiceKit/tests/Messages/OWSMessageManagerTest.m b/SignalServiceKit/tests/Messages/OWSMessageManagerTest.m index 4aad9499d..580bb8b99 100644 --- a/SignalServiceKit/tests/Messages/OWSMessageManagerTest.m +++ b/SignalServiceKit/tests/Messages/OWSMessageManagerTest.m @@ -2,7 +2,6 @@ // Copyright (c) 2018 Open Whisper Systems. All rights reserved. // -#import "OWSMessageManager.h" #import "ContactsManagerProtocol.h" #import "ContactsUpdater.h" #import "MockSSKEnvironment.h" @@ -11,6 +10,7 @@ #import "OWSFakeMessageSender.h" #import "OWSFakeNetworkManager.h" #import "OWSIdentityManager.h" +#import "OWSMessageManager.h" #import "OWSMessageSender.h" #import "OWSPrimaryStorage.h" #import "SSKBaseTestObjC.h" @@ -63,7 +63,7 @@ NS_ASSUME_NONNULL_BEGIN OWSAssert([SSKEnvironment.shared.messageSender isKindOfClass:[OWSFakeMessageSender class]]); OWSFakeMessageSender *fakeMessageSender = (OWSFakeMessageSender *)SSKEnvironment.shared.messageSender; - fakeMessageSender.enqueueTemporaryAttachmentBlock = ^{ + fakeMessageSender.sendTemporaryAttachmentWasCalledBlock = ^{ [messageWasSent fulfill]; }; diff --git a/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift b/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift new file mode 100644 index 000000000..93e7ac429 --- /dev/null +++ b/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift @@ -0,0 +1,224 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import XCTest +@testable import SignalServiceKit + +class MessageSenderJobQueueTest: SSKBaseTestSwift { + + override func setUp() { + super.setUp() + } + + override func tearDown() { + super.tearDown() + } + + // MARK: Dependencies + + private var messageSender: OWSFakeMessageSender { + return MockSSKEnvironment.shared.messageSender as! OWSFakeMessageSender + } + + // MARK: + + func test_messageIsSent() { + let message: TSOutgoingMessage = OutgoingMessageFactory().create() + + let expectation = sentExpectation(message: message) + + let jobQueue = MessageSenderJobQueue() + jobQueue.setup() + self.readWrite { transaction in + jobQueue.add(message: message, transaction: transaction) + } + + self.wait(for: [expectation], timeout: 0.1) + } + + func test_waitsForReady() { + let message: TSOutgoingMessage = OutgoingMessageFactory().create() + + let sentBeforeReadyExpectation = sentExpectation(message: message) + sentBeforeReadyExpectation.isInverted = true + + let jobQueue = MessageSenderJobQueue() + + self.readWrite { transaction in + jobQueue.add(message: message, transaction: transaction) + } + + self.wait(for: [sentBeforeReadyExpectation], timeout: 0.1) + + let sentAfterReadyExpectation = sentExpectation(message: message) + + jobQueue.setup() + + self.wait(for: [sentAfterReadyExpectation], timeout: 0.1) + } + + func test_respectsQueueOrder() { + let message1: TSOutgoingMessage = OutgoingMessageFactory().create() + let message2: TSOutgoingMessage = OutgoingMessageFactory().create() + let message3: TSOutgoingMessage = OutgoingMessageFactory().create() + + let jobQueue = MessageSenderJobQueue() + self.readWrite { transaction in + jobQueue.add(message: message1, transaction: transaction) + jobQueue.add(message: message2, transaction: transaction) + jobQueue.add(message: message3, transaction: transaction) + } + + let sendGroup = DispatchGroup() + sendGroup.enter() + sendGroup.enter() + sendGroup.enter() + + var sentMessages: [TSOutgoingMessage] = [] + messageSender.sendMessageWasCalledBlock = { sentMessage in + sentMessages.append(sentMessage) + sendGroup.leave() + } + + jobQueue.setup() + + switch sendGroup.wait(timeout: .now() + 1.0) { + case .timedOut: + XCTFail("timed out waiting for sends") + case .success: + XCTAssertEqual([message1, message2, message3].map { $0.uniqueId }, sentMessages.map { $0.uniqueId }) + } + } + + func test_sendingInvisibleMessage() { + let jobQueue = MessageSenderJobQueue() + jobQueue.setup() + + let message = OutgoingMessageFactory().buildDeliveryReceipt() + let expectation = sentExpectation(message: message) + self.readWrite { transaction in + jobQueue.add(message: message, transaction: transaction) + } + + self.wait(for: [expectation], timeout: 0.1) + } + + func test_retryableFailure() { + let message: TSOutgoingMessage = OutgoingMessageFactory().create() + + let jobQueue = MessageSenderJobQueue() + self.readWrite { transaction in + jobQueue.add(message: message, transaction: transaction) + } + + let finder = JobRecordFinder() + var readyRecords: [SSKJobRecord] = [] + self.readWrite { transaction in + readyRecords = finder.allRecords(label: MessageSenderJobQueue.jobRecordLabel, status: .ready, transaction: transaction) + } + XCTAssertEqual(1, readyRecords.count) + + let jobRecord = readyRecords.first! + XCTAssertEqual(0, jobRecord.failureCount) + + // simulate permanent failure + let error = NSError(domain: "foo", code: 0, userInfo: nil) + error.isRetryable = true + self.messageSender.stubbedFailingError = error + let expectation = sentExpectation(message: message) { + jobQueue.isReady = false + } + + jobQueue.setup() + self.wait(for: [expectation], timeout: 0.1) + + self.readWrite { transaction in + jobRecord.reload(with: transaction) + } + + XCTAssertEqual(1, jobRecord.failureCount) + XCTAssertEqual(.running, jobRecord.status) + + let retryCount: UInt = MessageSenderJobQueue.maxRetries + (1.. Void = { }) -> XCTestExpectation { + let expectation = self.expectation(description: "sent message") + + messageSender.sendMessageWasCalledBlock = { [weak messageSender] sentMessage in + guard sentMessage == message else { + XCTFail("unexpected sentMessage: \(sentMessage)") + return + } + expectation.fulfill() + block() + guard let strongMessageSender = messageSender else { + return + } + strongMessageSender.sendMessageWasCalledBlock = nil + } + + return expectation + } +} diff --git a/SignalServiceKit/tests/Network/MessageSenderJobRecordTest.swift b/SignalServiceKit/tests/Network/MessageSenderJobRecordTest.swift new file mode 100644 index 000000000..a92ec236b --- /dev/null +++ b/SignalServiceKit/tests/Network/MessageSenderJobRecordTest.swift @@ -0,0 +1,44 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import Foundation +import XCTest +@testable import SignalServiceKit + +let kMessageSenderJobRecordLabel = "MessageSender" +class SSKMessageSenderJobRecordTest: SSKBaseTestSwift { + + func test_savedVisibleMessage() { + let message = OutgoingMessageFactory().create() + let jobRecord = try! SSKMessageSenderJobRecord(message: message, removeMessageAfterSending: false, label: MessageSenderJobQueue.jobRecordLabel) + XCTAssertNotNil(jobRecord.messageId) + XCTAssertNotNil(jobRecord.threadId) + XCTAssertNil(jobRecord.invisibleMessage) + } + + func test_unsavedVisibleMessage() { + var message: TSOutgoingMessage! + self.readWrite { transaction in + message = OutgoingMessageFactory().build(transaction: transaction) + } + + do { + _ = try SSKMessageSenderJobRecord(message: message, removeMessageAfterSending: false, label: MessageSenderJobQueue.jobRecordLabel) + XCTFail("Should error") + } catch JobRecordError.assertionError { + // expected + } catch { + XCTFail("unexpected error: \(error)") + } + } + + func test_invisibleMessage() { + let message = OutgoingMessageFactory().buildDeliveryReceipt() + + let jobRecord = try! SSKMessageSenderJobRecord(message: message, removeMessageAfterSending: false, label: MessageSenderJobQueue.jobRecordLabel) + XCTAssertNil(jobRecord.messageId) + XCTAssertNotNil(jobRecord.threadId) + XCTAssertNotNil(jobRecord.invisibleMessage) + } +} diff --git a/SignalServiceKit/tests/Util/JobQueueTest.swift b/SignalServiceKit/tests/Util/JobQueueTest.swift new file mode 100644 index 000000000..5765b6dbe --- /dev/null +++ b/SignalServiceKit/tests/Util/JobQueueTest.swift @@ -0,0 +1,194 @@ +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import Foundation + +// +// Copyright (c) 2018 Open Whisper Systems. All rights reserved. +// + +import XCTest + +@testable import SignalServiceKit + +class TestJobRecord: SSKJobRecord { +// override init(label: String) { +// super.init(label: label) +// } +// +// override init(uniqueId: String?) { +// super.init(uniqueId: uniqueId) +// } +// +// required init?(coder: NSCoder) { +// super.init(coder: coder) +// } +// +// required init(dictionary dictionaryValue: [AnyHashable: Any]!) throws { +// try! super.init(dictionary: dictionaryValue) +// } +} + +let kJobRecordLabel = "TestJobRecord" +class TestJobQueue: JobQueue { + + // MARK: JobQueue + + typealias DurableOperationType = TestDurableOperation + var jobRecordLabel: String = kJobRecordLabel + static var maxRetries: UInt = 1 + + func setup() { + defaultSetup() + } + + func didMarkAsReady(oldJobRecord: TestJobRecord, transaction: YapDatabaseReadWriteTransaction) { + // no special handling + } + + var isReady: Bool = false { + didSet { + DispatchQueue.global().async { + self.workStep() + } + } + } + + let operationQueue = OperationQueue() + + func operationQueue(jobRecord: TestJobRecord) -> OperationQueue { + return self.operationQueue + } + + func buildOperation(jobRecord: TestJobRecord, transaction: YapDatabaseReadTransaction) throws -> TestDurableOperation { + return TestDurableOperation(jobRecord: jobRecord, jobBlock: self.jobBlock) + } + + // MARK: + + var jobBlock: (JobRecordType) -> Void = { _ in /* noop */ } + init() { } +} + +class TestDurableOperation: DurableOperation { + + // MARK: DurableOperation + + var jobRecord: TestJobRecord + + var remainingRetries: UInt = 0 + + weak var durableOperationDelegate: TestJobQueue? + + var operation: Operation { + return BlockOperation { self.jobBlock(self.jobRecord) } + } + + // MARK: + + var jobBlock: (TestJobRecord) -> Void + + init(jobRecord: TestJobRecord, jobBlock: @escaping (TestJobRecord) -> Void) { + self.jobRecord = jobRecord + self.jobBlock = jobBlock + } +} + +class JobQueueTest: SSKBaseTestSwift { + + override func setUp() { + super.setUp() + } + + override func tearDown() { + super.tearDown() + } + + // MARK: + + func buildJobRecord() -> TestJobRecord { + return TestJobRecord(label: kJobRecordLabel) + } + + // MARK: + + func test_setupMarksInProgressJobsAsReady() { + + let dispatchGroup = DispatchGroup() + + let jobQueue = TestJobQueue() + let jobRecord1 = buildJobRecord() + let jobRecord2 = buildJobRecord() + let jobRecord3 = buildJobRecord() + + var runList: [TestJobRecord] = [] + + jobQueue.jobBlock = { jobRecord in + runList.append(jobRecord) + dispatchGroup.leave() + } + + self.readWrite { transaction in + jobQueue.add(jobRecord: jobRecord1, transaction: transaction) + jobQueue.add(jobRecord: jobRecord2, transaction: transaction) + jobQueue.add(jobRecord: jobRecord3, transaction: transaction) + } + dispatchGroup.enter() + dispatchGroup.enter() + dispatchGroup.enter() + + let finder = JobRecordFinder() + self.readWrite { transaction in + XCTAssertEqual(3, finder.allRecords(label: kJobRecordLabel, status: .ready, transaction: transaction).count) + } + + // start queue + jobQueue.setup() + + if case .timedOut = dispatchGroup.wait(timeout: .now() + 1.0) { + XCTFail("timed out waiting for jobs") + } + + // Normally an operation enqueued for a JobRecord by a JobQueue will mark itself as complete + // by deleting itself. + // For testing, the operations enqueued by the TestJobQueue do *not* delete themeselves upon + // completion, simulating an operation which never compeleted. + + self.readWrite { transaction in + XCTAssertEqual(0, finder.allRecords(label: kJobRecordLabel, status: .ready, transaction: transaction).count) + XCTAssertEqual(3, finder.allRecords(label: kJobRecordLabel, status: .running, transaction: transaction).count) + } + + // Verify re-queue + + jobQueue.isReady = false + jobQueue.setup() + + self.readWrite { transaction in + XCTAssertEqual(3, finder.allRecords(label: kJobRecordLabel, status: .ready, transaction: transaction).count) + XCTAssertEqual(0, finder.allRecords(label: kJobRecordLabel, status: .running, transaction: transaction).count) + } + + let rerunGroup = DispatchGroup() + rerunGroup.enter() + rerunGroup.enter() + rerunGroup.enter() + + var rerunList: [TestJobRecord] = [] + jobQueue.jobBlock = { jobRecord in + rerunList.append(jobRecord) + rerunGroup.leave() + } + + jobQueue.isReady = true + + switch rerunGroup.wait(timeout: .now() + 1.0) { + case .timedOut: + XCTFail("timed out waiting for retry") + case .success: + // verify order maintained on requeue + XCTAssertEqual([jobRecord1, jobRecord2, jobRecord3].map { $0.uniqueId }, rerunList.map { $0.uniqueId }) + } + } +}