// // Copyright (c) 2019 Open Whisper Systems. All rights reserved. // #import "OWSReadReceiptManager.h" #import "AppReadiness.h" #import "OWSLinkedDeviceReadReceipt.h" #import "OWSMessageSender.h" #import "OWSOutgoingReceiptManager.h" #import "OWSPrimaryStorage.h" #import "OWSReadReceiptsForLinkedDevicesMessage.h" #import "OWSReceiptsForSenderMessage.h" #import "OWSStorage.h" #import "SSKEnvironment.h" #import "TSAccountManager.h" #import "TSContactThread.h" #import "TSDatabaseView.h" #import "TSIncomingMessage.h" #import "YapDatabaseConnection+OWS.h" #import #import #import #import NS_ASSUME_NONNULL_BEGIN NSString *const kIncomingMessageMarkedAsReadNotification = @"kIncomingMessageMarkedAsReadNotification"; @interface TSRecipientReadReceipt : TSYapDatabaseObject @property (nonatomic, readonly) uint64_t sentTimestamp; // Map of "recipient id"-to-"read timestamp". @property (nonatomic, readonly) NSDictionary *recipientMap; @end #pragma mark - @implementation TSRecipientReadReceipt + (NSString *)collection { return @"TSRecipientReadReceipt2"; } - (instancetype)initWithSentTimestamp:(uint64_t)sentTimestamp { OWSAssertDebug(sentTimestamp > 0); self = [super initWithUniqueId:[TSRecipientReadReceipt uniqueIdForSentTimestamp:sentTimestamp]]; if (self) { _sentTimestamp = sentTimestamp; _recipientMap = [NSDictionary new]; } return self; } + (NSString *)uniqueIdForSentTimestamp:(uint64_t)timestamp { return [NSString stringWithFormat:@"%llu", timestamp]; } - (void)addRecipientId:(NSString *)recipientId timestamp:(uint64_t)timestamp { OWSAssertDebug(recipientId.length > 0); OWSAssertDebug(timestamp > 0); NSMutableDictionary *recipientMapCopy = [self.recipientMap mutableCopy]; recipientMapCopy[recipientId] = @(timestamp); _recipientMap = [recipientMapCopy copy]; } + (void)addRecipientId:(NSString *)recipientId sentTimestamp:(uint64_t)sentTimestamp readTimestamp:(uint64_t)readTimestamp transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(transaction); TSRecipientReadReceipt *_Nullable recipientReadReceipt = [transaction objectForKey:[self uniqueIdForSentTimestamp:sentTimestamp] inCollection:[self collection]]; if (!recipientReadReceipt) { recipientReadReceipt = [[TSRecipientReadReceipt alloc] initWithSentTimestamp:sentTimestamp]; } [recipientReadReceipt addRecipientId:recipientId timestamp:readTimestamp]; [recipientReadReceipt saveWithTransaction:transaction]; } + (nullable NSDictionary *)recipientMapForSentTimestamp:(uint64_t)sentTimestamp transaction: (YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(transaction); TSRecipientReadReceipt *_Nullable recipientReadReceipt = [transaction objectForKey:[self uniqueIdForSentTimestamp:sentTimestamp] inCollection:[self collection]]; return recipientReadReceipt.recipientMap; } + (void)removeRecipientIdsForTimestamp:(uint64_t)sentTimestamp transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(transaction); [transaction removeObjectForKey:[self uniqueIdForSentTimestamp:sentTimestamp] inCollection:[self collection]]; } @end #pragma mark - NSString *const OWSReadReceiptManagerCollection = @"OWSReadReceiptManagerCollection"; NSString *const OWSReadReceiptManagerAreReadReceiptsEnabled = @"areReadReceiptsEnabled"; @interface OWSReadReceiptManager () @property (nonatomic, readonly) YapDatabaseConnection *dbConnection; // A map of "thread unique id"-to-"read receipt" for read receipts that // we will send to our linked devices. // // Should only be accessed while synchronized on the OWSReadReceiptManager. @property (nonatomic, readonly) NSMutableDictionary *toLinkedDevicesReadReceiptMap; // Should only be accessed while synchronized on the OWSReadReceiptManager. @property (nonatomic) BOOL isProcessing; @property (atomic) NSNumber *areReadReceiptsEnabledCached; @end #pragma mark - @implementation OWSReadReceiptManager + (instancetype)sharedManager { OWSAssert(SSKEnvironment.shared.readReceiptManager); return SSKEnvironment.shared.readReceiptManager; } - (instancetype)initWithPrimaryStorage:(OWSPrimaryStorage *)primaryStorage { self = [super init]; if (!self) { return self; } _dbConnection = primaryStorage.newDatabaseConnection; _toLinkedDevicesReadReceiptMap = [NSMutableDictionary new]; OWSSingletonAssert(); // Start processing. [AppReadiness runNowOrWhenAppDidBecomeReady:^{ [self scheduleProcessing]; }]; return self; } - (void)dealloc { [[NSNotificationCenter defaultCenter] removeObserver:self]; } #pragma mark - Dependencies - (SSKMessageSenderJobQueue *)messageSenderJobQueue { return SSKEnvironment.shared.messageSenderJobQueue; } - (OWSOutgoingReceiptManager *)outgoingReceiptManager { OWSAssertDebug(SSKEnvironment.shared.outgoingReceiptManager); return SSKEnvironment.shared.outgoingReceiptManager; } #pragma mark - // Schedules a processing pass, unless one is already scheduled. - (void)scheduleProcessing { OWSAssertDebug(AppReadiness.isAppReady); dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ @synchronized(self) { if (self.isProcessing) { return; } self.isProcessing = YES; [self process]; } }); } - (void)process { @synchronized(self) { OWSLogVerbose(@"Processing read receipts."); NSArray *readReceiptsForLinkedDevices = [self.toLinkedDevicesReadReceiptMap allValues]; [self.toLinkedDevicesReadReceiptMap removeAllObjects]; if (readReceiptsForLinkedDevices.count > 0) { OWSReadReceiptsForLinkedDevicesMessage *message = [[OWSReadReceiptsForLinkedDevicesMessage alloc] initWithReadReceipts:readReceiptsForLinkedDevices]; [LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *_Nonnull transaction) { [self.messageSenderJobQueue addMessage:message transaction:transaction]; } error:nil]; } BOOL didWork = readReceiptsForLinkedDevices.count > 0; if (didWork) { // Wait N seconds before processing read receipts again. // This allows time for a batch to accumulate. // // We want a value high enough to allow us to effectively de-duplicate, // read receipts without being so high that we risk not sending read // receipts due to app exit. const CGFloat kProcessingFrequencySeconds = 3.f; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(kProcessingFrequencySeconds * NSEC_PER_SEC)), dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ [self process]; }); } else { self.isProcessing = NO; } } } #pragma mark - Mark as Read Locally - (void)markAsReadLocallyBeforeSortId:(uint64_t)sortId thread:(TSThread *)thread { OWSAssertDebug(thread); dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ [LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { [self markAsReadBeforeSortId:sortId thread:thread readTimestamp:[NSDate ows_millisecondTimeStamp] wasLocal:YES transaction:transaction]; } error:nil]; }); } - (void)messageWasReadLocally:(TSIncomingMessage *)message { dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ @synchronized(self) { NSString *threadUniqueId = message.uniqueThreadId; OWSAssertDebug(threadUniqueId.length > 0); NSString *messageAuthorId = message.authorId; OWSAssertDebug(messageAuthorId.length > 0); OWSLinkedDeviceReadReceipt *newReadReceipt = [[OWSLinkedDeviceReadReceipt alloc] initWithSenderId:messageAuthorId messageIdTimestamp:message.timestamp readTimestamp:[NSDate ows_millisecondTimeStamp]]; OWSLinkedDeviceReadReceipt *_Nullable oldReadReceipt = self.toLinkedDevicesReadReceiptMap[threadUniqueId]; if (oldReadReceipt && oldReadReceipt.messageIdTimestamp > newReadReceipt.messageIdTimestamp) { // If there's an existing "linked device" read receipt for the same thread with // a newer timestamp, discard this "linked device" read receipt. OWSLogVerbose(@"Ignoring redundant read receipt for linked devices."); } else { OWSLogVerbose(@"Enqueuing read receipt for linked devices."); self.toLinkedDevicesReadReceiptMap[threadUniqueId] = newReadReceipt; } if (![LKSessionMetaProtocol shouldSendReceiptForThread:message.thread]) { return; } if ([self areReadReceiptsEnabled]) { OWSLogVerbose(@"Enqueuing read receipt for sender."); [self.outgoingReceiptManager enqueueReadReceiptForEnvelope:messageAuthorId timestamp:message.timestamp]; } [self scheduleProcessing]; } }); } #pragma mark - Read Receipts From Recipient - (void)processReadReceiptsFromRecipientId:(NSString *)recipientId sentTimestamps:(NSArray *)sentTimestamps readTimestamp:(uint64_t)readTimestamp { OWSAssertDebug(recipientId.length > 0); OWSAssertDebug(sentTimestamps); if (![self areReadReceiptsEnabled]) { OWSLogInfo(@"Ignoring incoming receipt message as read receipts are disabled."); return; } dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ [LKStorage writeSyncWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { for (NSNumber *nsSentTimestamp in sentTimestamps) { UInt64 sentTimestamp = [nsSentTimestamp unsignedLongLongValue]; NSArray *messages = (NSArray *)[TSInteraction interactionsWithTimestamp:sentTimestamp ofClass:[TSOutgoingMessage class] withTransaction:transaction]; if (messages.count > 1) { OWSLogError(@"More than one matching message with timestamp: %llu.", sentTimestamp); } if (messages.count > 0) { // TODO: We might also need to "mark as read by recipient" any older messages // from us in that thread. Or maybe this state should hang on the thread? for (TSOutgoingMessage *message in messages) { [message updateWithReadRecipientId:recipientId readTimestamp:readTimestamp transaction:transaction]; } } else { // Persist the read receipts so that we can apply them to outgoing messages // that we learn about later through sync messages. [TSRecipientReadReceipt addRecipientId:recipientId sentTimestamp:sentTimestamp readTimestamp:readTimestamp transaction:transaction]; } } } error:nil]; }); } - (void)applyEarlyReadReceiptsForOutgoingMessageFromLinkedDevice:(TSOutgoingMessage *)message transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(message); OWSAssertDebug(transaction); uint64_t sentTimestamp = message.timestamp; NSDictionary *recipientMap = [TSRecipientReadReceipt recipientMapForSentTimestamp:sentTimestamp transaction:transaction]; if (!recipientMap) { return; } OWSAssertDebug(recipientMap.count > 0); for (NSString *recipientId in recipientMap) { NSNumber *nsReadTimestamp = recipientMap[recipientId]; OWSAssertDebug(nsReadTimestamp); uint64_t readTimestamp = [nsReadTimestamp unsignedLongLongValue]; [message updateWithReadRecipientId:recipientId readTimestamp:readTimestamp transaction:transaction]; } [TSRecipientReadReceipt removeRecipientIdsForTimestamp:message.timestamp transaction:transaction]; } #pragma mark - Linked Device Read Receipts - (void)applyEarlyReadReceiptsForIncomingMessage:(TSIncomingMessage *)message transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(message); OWSAssertDebug(transaction); NSString *senderId = message.authorId; uint64_t timestamp = message.timestamp; if (senderId.length < 1 || timestamp < 1) { OWSFailDebug(@"Invalid incoming message: %@ %llu", senderId, timestamp); return; } OWSLinkedDeviceReadReceipt *_Nullable readReceipt = [OWSLinkedDeviceReadReceipt findLinkedDeviceReadReceiptWithSenderId:senderId messageIdTimestamp:timestamp transaction:transaction]; if (!readReceipt) { return; } [message markAsReadAtTimestamp:readReceipt.readTimestamp sendReadReceipt:NO transaction:transaction]; [readReceipt removeWithTransaction:transaction]; } - (void)processReadReceiptsFromLinkedDevice:(NSArray *)readReceiptProtos readTimestamp:(uint64_t)readTimestamp transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(readReceiptProtos); OWSAssertDebug(transaction); for (SSKProtoSyncMessageRead *readReceiptProto in readReceiptProtos) { NSString *_Nullable senderId = readReceiptProto.sender; uint64_t messageIdTimestamp = readReceiptProto.timestamp; if (senderId.length == 0) { OWSFailDebug(@"senderId was unexpectedly nil"); continue; } if (messageIdTimestamp == 0) { OWSFailDebug(@"messageIdTimestamp was unexpectedly 0"); continue; } NSArray *messages = (NSArray *)[TSInteraction interactionsWithTimestamp:messageIdTimestamp ofClass:[TSIncomingMessage class] withTransaction:transaction]; if (messages.count > 0) { for (TSIncomingMessage *message in messages) { NSTimeInterval secondsSinceRead = [NSDate new].timeIntervalSince1970 - readTimestamp / 1000; OWSAssertDebug([message isKindOfClass:[TSIncomingMessage class]]); OWSLogDebug(@"read on linked device %f seconds ago", secondsSinceRead); [self markAsReadOnLinkedDevice:message readTimestamp:readTimestamp transaction:transaction]; } } else { // Received read receipt for unknown incoming message. // Persist in case we receive the incoming message later. OWSLinkedDeviceReadReceipt *readReceipt = [[OWSLinkedDeviceReadReceipt alloc] initWithSenderId:senderId messageIdTimestamp:messageIdTimestamp readTimestamp:readTimestamp]; [readReceipt saveWithTransaction:transaction]; } } } - (void)markAsReadOnLinkedDevice:(TSIncomingMessage *)message readTimestamp:(uint64_t)readTimestamp transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(message); OWSAssertDebug(transaction); // Always re-mark the message as read to ensure any earlier read time is applied to disappearing messages. [message markAsReadAtTimestamp:readTimestamp sendReadReceipt:NO transaction:transaction]; // Also mark any unread messages appearing earlier in the thread as read as well. [self markAsReadBeforeSortId:message.sortId thread:[message threadWithTransaction:transaction] readTimestamp:readTimestamp wasLocal:NO transaction:transaction]; } #pragma mark - Mark As Read - (void)markAsReadBeforeSortId:(uint64_t)sortId thread:(TSThread *)thread readTimestamp:(uint64_t)readTimestamp wasLocal:(BOOL)wasLocal transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssertDebug(sortId > 0); OWSAssertDebug(thread); OWSAssertDebug(transaction); NSMutableArray> *newlyReadList = [NSMutableArray new]; [[TSDatabaseView unseenDatabaseViewExtension:transaction] enumerateKeysAndObjectsInGroup:thread.uniqueId usingBlock:^(NSString *collection, NSString *key, id object, NSUInteger index, BOOL *stop) { if (![object conformsToProtocol:@protocol(OWSReadTracking)]) { OWSFailDebug( @"Expected to conform to OWSReadTracking: object with class: %@ collection: %@ " @"key: %@", [object class], collection, key); return; } id possiblyRead = (id)object; if (possiblyRead.sortId > sortId) { *stop = YES; return; } OWSAssertDebug(!possiblyRead.read); OWSAssertDebug(possiblyRead.expireStartedAt == 0); if (!possiblyRead.read) { [newlyReadList addObject:possiblyRead]; } }]; if (newlyReadList.count < 1) { return; } if (wasLocal) { OWSLogError(@"Marking %lu messages as read locally.", (unsigned long)newlyReadList.count); } else { OWSLogError(@"Marking %lu messages as read by linked device.", (unsigned long)newlyReadList.count); } for (id readItem in newlyReadList) { [readItem markAsReadAtTimestamp:readTimestamp sendReadReceipt:wasLocal transaction:transaction]; } } #pragma mark - Settings - (void)prepareCachedValues { [self areReadReceiptsEnabled]; } - (BOOL)areReadReceiptsEnabled { // We don't need to worry about races around this cached value. if (!self.areReadReceiptsEnabledCached) { self.areReadReceiptsEnabledCached = @([self.dbConnection boolForKey:OWSReadReceiptManagerAreReadReceiptsEnabled inCollection:OWSReadReceiptManagerCollection defaultValue:NO]); } return [self.areReadReceiptsEnabledCached boolValue]; } - (void)setAreReadReceiptsEnabled:(BOOL)value { OWSLogInfo(@"setAreReadReceiptsEnabled: %d.", value); [self.dbConnection setBool:value forKey:OWSReadReceiptManagerAreReadReceiptsEnabled inCollection:OWSReadReceiptManagerCollection]; [SSKEnvironment.shared.syncManager sendConfigurationSyncMessage]; self.areReadReceiptsEnabledCached = @(value); } @end NS_ASSUME_NONNULL_END