Fix race condition with read receipts before incoming message
// FREEBIE
This commit is contained in:
parent
2de6927453
commit
0933b92128
|
@ -1,8 +1,12 @@
|
|||
// Copyright © 2016 Open Whisper Systems. All rights reserved.
|
||||
|
||||
#import "TSYapDatabaseObject.h"
|
||||
|
||||
NS_ASSUME_NONNULL_BEGIN
|
||||
|
||||
@interface OWSReadReceipt : NSObject
|
||||
@class YapDatabase;
|
||||
|
||||
@interface OWSReadReceipt : TSYapDatabaseObject
|
||||
|
||||
@property (nonatomic, readonly) NSString *senderId;
|
||||
@property (nonatomic, readonly) uint64_t timestamp;
|
||||
|
@ -11,6 +15,9 @@ NS_ASSUME_NONNULL_BEGIN
|
|||
|
||||
- (instancetype)initWithSenderId:(NSString *)senderId timestamp:(uint64_t)timestamp;
|
||||
|
||||
+ (nullable instancetype)firstWithSenderId:(NSString *)senderId timestamp:(uint64_t)timestamp;
|
||||
+ (void)registerIndexOnSenderIdAndTimestampWithDatabase:(YapDatabase *)database;
|
||||
|
||||
@end
|
||||
|
||||
NS_ASSUME_NONNULL_END
|
|
@ -1,9 +1,16 @@
|
|||
// Copyright © 2016 Open Whisper Systems. All rights reserved.
|
||||
|
||||
#import "OWSReadReceipt.h"
|
||||
#import <YapDatabase/YapDatabase.h>
|
||||
#import <YapDatabase/YapDatabaseConnection.h>
|
||||
#import <YapDatabase/YapDatabaseSecondaryIndex.h>
|
||||
|
||||
NS_ASSUME_NONNULL_BEGIN
|
||||
|
||||
NSString *const IndexOnSenderIdAndTimestamp = @"OWSReadReceiptIndexOnSenderIdAndTimestamp";
|
||||
NSString *const TimestampColumn = @"timestamp";
|
||||
NSString *const SenderIdColumn = @"senderId";
|
||||
|
||||
@implementation OWSReadReceipt
|
||||
|
||||
- (instancetype)initWithSenderId:(NSString *)senderId timestamp:(uint64_t)timestamp;
|
||||
|
@ -30,6 +37,87 @@ NS_ASSUME_NONNULL_BEGIN
|
|||
return self;
|
||||
}
|
||||
|
||||
- (instancetype)initWithCoder:(NSCoder *)decoder
|
||||
{
|
||||
self = [super initWithCoder:decoder];
|
||||
if (!self) {
|
||||
return nil;
|
||||
}
|
||||
|
||||
_valid = YES;
|
||||
_validationErrorMessages = @[];
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
+ (NSDictionary *)encodingBehaviorsByPropertyKey
|
||||
{
|
||||
NSMutableDictionary *behaviorsByPropertyKey = [[super encodingBehaviorsByPropertyKey] mutableCopy];
|
||||
|
||||
// Don't persist transient properties used in validation.
|
||||
behaviorsByPropertyKey[@"valid"] = @(MTLModelEncodingBehaviorExcluded);
|
||||
behaviorsByPropertyKey[@"validationErrorMessages"] = @(MTLModelEncodingBehaviorExcluded);
|
||||
|
||||
return [behaviorsByPropertyKey copy];
|
||||
}
|
||||
|
||||
+ (void)registerIndexOnSenderIdAndTimestampWithDatabase:(YapDatabase *)database
|
||||
{
|
||||
YapDatabaseSecondaryIndexSetup *setup = [YapDatabaseSecondaryIndexSetup new];
|
||||
[setup addColumn:SenderIdColumn withType:YapDatabaseSecondaryIndexTypeText];
|
||||
[setup addColumn:TimestampColumn withType:YapDatabaseSecondaryIndexTypeInteger];
|
||||
|
||||
YapDatabaseSecondaryIndexHandler *handler =
|
||||
[YapDatabaseSecondaryIndexHandler withObjectBlock:^(YapDatabaseReadTransaction *transaction,
|
||||
NSMutableDictionary *dict,
|
||||
NSString *collection,
|
||||
NSString *key,
|
||||
id object) {
|
||||
if ([object isKindOfClass:[OWSReadReceipt class]]) {
|
||||
OWSReadReceipt *readReceipt = (OWSReadReceipt *)object;
|
||||
dict[SenderIdColumn] = readReceipt.senderId;
|
||||
dict[TimestampColumn] = @(readReceipt.timestamp);
|
||||
}
|
||||
}];
|
||||
|
||||
YapDatabaseSecondaryIndex *index = [[YapDatabaseSecondaryIndex alloc] initWithSetup:setup handler:handler];
|
||||
[database registerExtension:index withName:IndexOnSenderIdAndTimestamp];
|
||||
}
|
||||
|
||||
+ (nullable instancetype)firstWithSenderId:(NSString *)senderId timestamp:(uint64_t)timestamp
|
||||
{
|
||||
__block OWSReadReceipt *foundReadReceipt;
|
||||
|
||||
NSString *queryFormat = [NSString stringWithFormat:@"WHERE %@ = ? AND %@ = ?", SenderIdColumn, TimestampColumn];
|
||||
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:queryFormat, senderId, @(timestamp)];
|
||||
|
||||
[[self dbConnection] readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
|
||||
[[transaction ext:IndexOnSenderIdAndTimestamp]
|
||||
enumerateKeysAndObjectsMatchingQuery:query
|
||||
usingBlock:^(NSString *collection, NSString *key, id object, BOOL *stop) {
|
||||
if (![object isKindOfClass:[OWSReadReceipt class]]) {
|
||||
DDLogError(@"%@ Unexpected object in index: %@", self.tag, object);
|
||||
return;
|
||||
}
|
||||
|
||||
foundReadReceipt = (OWSReadReceipt *)object;
|
||||
*stop = YES;
|
||||
}];
|
||||
}];
|
||||
|
||||
return foundReadReceipt;
|
||||
}
|
||||
|
||||
+ (NSString *)tag
|
||||
{
|
||||
return [NSString stringWithFormat:@"[%@]", self.class];
|
||||
}
|
||||
|
||||
- (NSString *)tag
|
||||
{
|
||||
return self.class.tag;
|
||||
}
|
||||
|
||||
@end
|
||||
|
||||
NS_ASSUME_NONNULL_END
|
||||
|
|
|
@ -3,11 +3,16 @@
|
|||
NS_ASSUME_NONNULL_BEGIN
|
||||
|
||||
@class OWSSignalServiceProtosSyncMessageRead;
|
||||
@class OWSReadReceipt;
|
||||
@class TSIncomingMessage;
|
||||
|
||||
@interface OWSReadReceiptsProcessor : NSObject
|
||||
|
||||
- (instancetype)initWithReadReceiptProtos:(NSArray<OWSSignalServiceProtosSyncMessageRead *> *)readReceiptProtos
|
||||
NS_DESIGNATED_INITIALIZER;
|
||||
- (instancetype)initWithReadReceiptProtos:(NSArray<OWSSignalServiceProtosSyncMessageRead *> *)readReceiptProtos;
|
||||
- (instancetype)initWithIncomingMessage:(TSIncomingMessage *)incomingMessage;
|
||||
- (instancetype)initWithReadReceipts:(NSArray<OWSReadReceipt *> *)readReceipts NS_DESIGNATED_INITIALIZER;
|
||||
- (instancetype)init NS_UNAVAILABLE;
|
||||
|
||||
- (void)process;
|
||||
|
||||
@end
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
#import "OWSReadReceiptsProcessor.h"
|
||||
#import "OWSReadReceipt.h"
|
||||
#import "OWSSignalServiceProtos.pb.h"
|
||||
#import "TSContactThread.h"
|
||||
#import "TSIncomingMessage.h"
|
||||
|
||||
NS_ASSUME_NONNULL_BEGIN
|
||||
|
@ -15,50 +16,81 @@ NS_ASSUME_NONNULL_BEGIN
|
|||
|
||||
@implementation OWSReadReceiptsProcessor
|
||||
|
||||
- (instancetype)init
|
||||
{
|
||||
return [self initWithReadReceiptProtos:@[]];
|
||||
}
|
||||
|
||||
- (instancetype)initWithReadReceiptProtos:(NSArray<OWSSignalServiceProtosSyncMessageRead *> *)readReceiptProtos
|
||||
- (instancetype)initWithReadReceipts:(NSArray<OWSReadReceipt *> *)readReceipts
|
||||
{
|
||||
self = [super init];
|
||||
if (!self) {
|
||||
return self;
|
||||
}
|
||||
|
||||
NSMutableArray<OWSReadReceipt *> *readReceipts = [NSMutableArray new];
|
||||
for (OWSSignalServiceProtosSyncMessageRead *readReceiptProto in readReceiptProtos) {
|
||||
OWSReadReceipt *readReceipt =
|
||||
[[OWSReadReceipt alloc] initWithSenderId:readReceiptProto.sender timestamp:readReceiptProto.timestamp];
|
||||
if (readReceipt.isValid) {
|
||||
[readReceipts addObject:readReceipt];
|
||||
} else {
|
||||
DDLogError(@"Received invalid read receipt: %@", readReceipt.validationErrorMessages);
|
||||
}
|
||||
}
|
||||
|
||||
_readReceipts = [readReceipts copy];
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
- (instancetype)initWithReadReceiptProtos:(NSArray<OWSSignalServiceProtosSyncMessageRead *> *)readReceiptProtos
|
||||
{
|
||||
NSMutableArray<OWSReadReceipt *> *readReceipts = [NSMutableArray new];
|
||||
for (OWSSignalServiceProtosSyncMessageRead *readReceiptProto in readReceiptProtos) {
|
||||
OWSReadReceipt *readReceipt =
|
||||
[[OWSReadReceipt alloc] initWithSenderId:readReceiptProto.sender timestamp:readReceiptProto.timestamp];
|
||||
if (readReceipt.isValid) {
|
||||
[readReceipts addObject:readReceipt];
|
||||
} else {
|
||||
DDLogError(@"%@ Received invalid read receipt: %@", self.tag, readReceipt.validationErrorMessages);
|
||||
}
|
||||
}
|
||||
|
||||
return [self initWithReadReceipts:[readReceipts copy]];
|
||||
}
|
||||
|
||||
- (instancetype)initWithIncomingMessage:(TSIncomingMessage *)message
|
||||
{
|
||||
// Only groupthread sets authorId, thus this crappy code.
|
||||
// TODO ALL incoming messages should have an authorId.
|
||||
NSString *messageAuthorId;
|
||||
if (message.authorId) { // Group Thread
|
||||
messageAuthorId = message.authorId;
|
||||
} else { // Contact Thread
|
||||
messageAuthorId = [TSContactThread contactIdFromThreadId:message.uniqueThreadId];
|
||||
}
|
||||
|
||||
OWSReadReceipt *readReceipt = [OWSReadReceipt firstWithSenderId:messageAuthorId timestamp:message.timestamp];
|
||||
if (readReceipt) {
|
||||
DDLogInfo(@"%@ Found prior read receipt for incoming message.", self.tag);
|
||||
return [self initWithReadReceipts:@[ readReceipt ]];
|
||||
} else {
|
||||
return [self initWithReadReceipts:@[]];
|
||||
}
|
||||
}
|
||||
|
||||
- (void)process
|
||||
{
|
||||
DDLogInfo(@"Processing %ld read receipts.", (unsigned long)self.readReceipts.count);
|
||||
DDLogDebug(@"%@ Processing %ld read receipts.", self.tag, (unsigned long)self.readReceipts.count);
|
||||
for (OWSReadReceipt *readReceipt in self.readReceipts) {
|
||||
TSIncomingMessage *message =
|
||||
[TSIncomingMessage findMessageWithAuthorId:readReceipt.senderId timestamp:readReceipt.timestamp];
|
||||
if (message) {
|
||||
[message markAsReadFromReadReceipt];
|
||||
// If it was previously saved, no need to keep it around any longer.
|
||||
[readReceipt remove];
|
||||
} else {
|
||||
// TODO keep read receipts around so that if we get the receipt before the message,
|
||||
// we can immediately mark the message as read once we get it.
|
||||
DDLogWarn(@"Couldn't find message for read receipt. Message not synced?");
|
||||
DDLogDebug(@"%@ Received read receipt for an unkown message. Saving it for later.", self.tag);
|
||||
[readReceipt save];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+ (NSString *)tag
|
||||
{
|
||||
return [NSString stringWithFormat:@"[%@]", self.class];
|
||||
}
|
||||
|
||||
- (NSString *)tag
|
||||
{
|
||||
return self.class.tag;
|
||||
}
|
||||
|
||||
@end
|
||||
|
||||
NS_ASSUME_NONNULL_END
|
||||
|
|
|
@ -430,6 +430,12 @@
|
|||
NSString *name = [thread name];
|
||||
|
||||
if (incomingMessage && thread) {
|
||||
// TODO Delay by 100ms?
|
||||
|
||||
OWSReadReceiptsProcessor *readReceiptsProcessor =
|
||||
[[OWSReadReceiptsProcessor alloc] initWithIncomingMessage:incomingMessage];
|
||||
[readReceiptsProcessor process];
|
||||
|
||||
[[TextSecureKitEnv sharedEnv]
|
||||
.notificationsManager notifyUserForIncomingMessage:incomingMessage
|
||||
from:name
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
#import "TSStorageManager.h"
|
||||
#import "NSData+Base64.h"
|
||||
#import "OWSReadReceipt.h"
|
||||
#import "SignalRecipient.h"
|
||||
#import "TSAttachmentStream.h"
|
||||
#import "TSDatabaseSecondaryIndexes.h"
|
||||
|
@ -61,6 +62,7 @@ static NSString *keychainDBPassAccount = @"TSDatabasePass";
|
|||
[TSDatabaseView registerSecondaryDevicesDatabaseView];
|
||||
|
||||
[self.database registerExtension:[TSDatabaseSecondaryIndexes registerTimeStampIndex] withName:@"idx"];
|
||||
[OWSReadReceipt registerIndexOnSenderIdAndTimestampWithDatabase:self.database];
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue