// // Copyright (c) 2018 Open Whisper Systems. All rights reserved. // #import "OWSOutgoingReceiptManager.h" #import #import "SSKEnvironment.h" #import "AppReadiness.h" #import "OWSPrimaryStorage.h" #import "TSContactThread.h" #import "TSYapDatabaseObject.h" #import #import #import #import NS_ASSUME_NONNULL_BEGIN typedef NS_ENUM(NSUInteger, OWSReceiptType) { OWSReceiptType_Delivery, OWSReceiptType_Read, }; NSString *const kOutgoingDeliveryReceiptManagerCollection = @"kOutgoingDeliveryReceiptManagerCollection"; NSString *const kOutgoingReadReceiptManagerCollection = @"kOutgoingReadReceiptManagerCollection"; @interface OWSOutgoingReceiptManager () @property (nonatomic, readonly) YapDatabaseConnection *dbConnection; @property (nonatomic) Reachability *reachability; // This property should only be accessed on the serialQueue. @property (nonatomic) BOOL isProcessing; @end #pragma mark - @implementation OWSOutgoingReceiptManager + (instancetype)sharedManager { return SSKEnvironment.shared.outgoingReceiptManager; } - (instancetype)initWithPrimaryStorage:(OWSPrimaryStorage *)primaryStorage { self = [super init]; if (!self) { return self; } self.reachability = [Reachability reachabilityForInternetConnection]; _dbConnection = primaryStorage.newDatabaseConnection; [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(reachabilityChanged) name:kReachabilityChangedNotification object:nil]; // Start processing. [AppReadiness runNowOrWhenAppDidBecomeReady:^{ [self process]; }]; return self; } - (void)dealloc { [[NSNotificationCenter defaultCenter] removeObserver:self]; } #pragma mark - - (dispatch_queue_t)serialQueue { static dispatch_queue_t _serialQueue; static dispatch_once_t onceToken; dispatch_once(&onceToken, ^{ _serialQueue = dispatch_queue_create("org.whispersystems.outgoingReceipts", DISPATCH_QUEUE_SERIAL); }); return _serialQueue; } // Schedules a processing pass, unless one is already scheduled. - (void)process { dispatch_async(self.serialQueue, ^{ if (self.isProcessing) { return; } self.isProcessing = YES; if (!self.reachability.isReachable) { // No network availability; abort. self.isProcessing = NO; return; } NSMutableArray *sendPromises = [NSMutableArray array]; [sendPromises addObjectsFromArray:[self sendReceiptsForReceiptType:OWSReceiptType_Read]]; if (sendPromises.count < 1) { // No work to do; abort. self.isProcessing = NO; return; } AnyPromise *completionPromise = PMKJoin(sendPromises); completionPromise.ensure(^() { // Wait N seconds before conducting another pass. // This allows time for a batch to accumulate. // // We want a value high enough to allow us to effectively de-duplicate // receipts without being so high that we incur so much latency that // the user notices. const CGFloat kProcessingFrequencySeconds = 3.f; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)), self.serialQueue, ^{ self.isProcessing = NO; [self process]; }); }); [completionPromise retainUntilComplete]; }); } - (NSArray *)sendReceiptsForReceiptType:(OWSReceiptType)receiptType { NSString *collection = [self collectionForReceiptType:receiptType]; NSMutableDictionary *> *queuedReceiptMap = [NSMutableDictionary new]; [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *transaction) { [transaction enumerateKeysAndObjectsInCollection:collection usingBlock:^(NSString *key, id object, BOOL *stop) { NSString *recipientId = key; NSSet *timestamps = object; queuedReceiptMap[recipientId] = [timestamps copy]; }]; }]; NSMutableArray *sendPromises = [NSMutableArray array]; for (NSString *recipientId in queuedReceiptMap) { NSSet *timestampsAsSet = queuedReceiptMap[recipientId]; if (timestampsAsSet.count < 1) { continue; } TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId]; if (thread.isGroupThread) { // Don't send receipts in group threads continue; } SNReadReceipt *readReceipt = [SNReadReceipt new]; NSMutableArray *timestamps = [NSMutableArray new]; for (NSNumber *timestamp in timestampsAsSet) { [timestamps addObject:timestamp]; } readReceipt.timestamps = timestamps; [LKStorage writeWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { AnyPromise *promise = [SNMessageSender sendNonDurably:readReceipt inThread:thread usingTransaction:transaction] .thenOn(self.serialQueue, ^(id object) { [self dequeueReceiptsWithRecipientId:recipientId timestamps:timestampsAsSet receiptType:@"Read"]; }); [sendPromises addObject:promise]; }]; } return [sendPromises copy]; } - (void)enqueueDeliveryReceiptForEnvelope:(SNProtoEnvelope *)envelope { [self enqueueReceiptWithRecipientId:envelope.source timestamp:envelope.timestamp receiptType:OWSReceiptType_Delivery]; } - (void)enqueueReadReceiptForEnvelope:(NSString *)messageAuthorId timestamp:(uint64_t)timestamp { [self enqueueReceiptWithRecipientId:messageAuthorId timestamp:timestamp receiptType:OWSReceiptType_Read]; } - (void)enqueueReceiptWithRecipientId:(NSString *)recipientId timestamp:(uint64_t)timestamp receiptType:(OWSReceiptType)receiptType { NSString *collection = [self collectionForReceiptType:receiptType]; if (recipientId.length < 1) { return; } if (timestamp < 1) { return; } dispatch_async(self.serialQueue, ^{ [LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { NSSet *_Nullable oldTimestamps = [transaction objectForKey:recipientId inCollection:collection]; NSMutableSet *newTimestamps = (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]); [newTimestamps addObject:@(timestamp)]; [transaction setObject:newTimestamps forKey:recipientId inCollection:collection]; }]; [self process]; }); } - (void)dequeueReceiptsWithRecipientId:(NSString *)recipientId timestamps:(NSSet *)timestamps receiptType:(OWSReceiptType)receiptType { NSString *collection = [self collectionForReceiptType:receiptType]; if (recipientId.length < 1) { return; } if (timestamps.count < 1) { return; } dispatch_async(self.serialQueue, ^{ [LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { NSSet *_Nullable oldTimestamps = [transaction objectForKey:recipientId inCollection:collection]; NSMutableSet *newTimestamps = (oldTimestamps ? [oldTimestamps mutableCopy] : [NSMutableSet new]); [newTimestamps minusSet:timestamps]; if (newTimestamps.count > 0) { [transaction setObject:newTimestamps forKey:recipientId inCollection:collection]; } else { [transaction removeObjectForKey:recipientId inCollection:collection]; } }]; }); } - (void)reachabilityChanged { [self process]; } - (NSString *)collectionForReceiptType:(OWSReceiptType)receiptType { switch (receiptType) { case OWSReceiptType_Delivery: return kOutgoingDeliveryReceiptManagerCollection; case OWSReceiptType_Read: return kOutgoingReadReceiptManagerCollection; } } @end NS_ASSUME_NONNULL_END