Merge branch 'mkirk/batch-contact-intersections'

This commit is contained in:
Michael Kirk 2018-07-19 13:40:38 -06:00
commit 0513077d29
9 changed files with 389 additions and 96 deletions

2
Pods

@ -1 +1 @@
Subproject commit 2bace50e8e41e281a9ddd525bc4b8a97446301ca
Subproject commit e9badd4baacc97c7159f7f8ffd2a51f4581fb869

View File

@ -9,10 +9,18 @@
#import "OWSRequestFactory.h"
#import "PhoneNumber.h"
#import "TSNetworkManager.h"
#import <SignalServiceKit/SignalServiceKit-Swift.h>
#import <YapDatabase/YapDatabase.h>
NS_ASSUME_NONNULL_BEGIN
@interface ContactsUpdater ()
@property (nonatomic, readonly) NSOperationQueue *contactIntersectionQueue;
@end
@implementation ContactsUpdater
+ (instancetype)sharedUpdater {
@ -32,6 +40,9 @@ NS_ASSUME_NONNULL_BEGIN
return self;
}
_contactIntersectionQueue = [NSOperationQueue new];
_contactIntersectionQueue.maxConcurrentOperationCount = 1;
OWSSingletonAssert();
return self;
@ -88,75 +99,36 @@ NS_ASSUME_NONNULL_BEGIN
- (void)contactIntersectionWithSet:(NSSet<NSString *> *)recipientIdsToLookup
success:(void (^)(NSSet<SignalRecipient *> *recipients))success
failure:(void (^)(NSError *error))failure {
failure:(void (^)(NSError *error))failure
{
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
NSMutableDictionary<NSString *, NSString *> *phoneNumbersByHashes = [NSMutableDictionary new];
for (NSString *recipientId in recipientIdsToLookup) {
NSString *hash = [Cryptography truncatedSHA1Base64EncodedWithoutPadding:recipientId];
phoneNumbersByHashes[hash] = recipientId;
}
NSArray<NSString *> *hashes = [phoneNumbersByHashes allKeys];
TSRequest *request = [OWSRequestFactory contactsIntersectionRequestWithHashesArray:hashes];
[[TSNetworkManager sharedManager] makeRequest:request
success:^(NSURLSessionDataTask *task, id responseDict) {
NSMutableSet<NSString *> *registeredRecipientIds = [NSMutableSet new];
if ([responseDict isKindOfClass:[NSDictionary class]]) {
NSArray<NSDictionary *> *_Nullable contactsArray = responseDict[@"contacts"];
if ([contactsArray isKindOfClass:[NSArray class]]) {
for (NSDictionary *contactDict in contactsArray) {
if (![contactDict isKindOfClass:[NSDictionary class]]) {
OWSProdLogAndFail(@"%@ invalid contact dictionary.", self.logTag);
continue;
}
NSString *_Nullable hash = contactDict[@"token"];
if (hash.length < 1) {
OWSProdLogAndFail(@"%@ contact missing hash.", self.logTag);
continue;
}
NSString *_Nullable recipientId = phoneNumbersByHashes[hash];
if (recipientId.length < 1) {
OWSProdLogAndFail(@"%@ An intersecting hash wasn't found in the mapping.", self.logTag);
continue;
}
if (![recipientIdsToLookup containsObject:recipientId]) {
OWSProdLogAndFail(@"%@ Intersection response included unexpected recipient.", self.logTag);
continue;
}
[registeredRecipientIds addObject:recipientId];
}
}
}
NSMutableSet<SignalRecipient *> *recipients = [NSMutableSet new];
[OWSPrimaryStorage.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
for (NSString *recipientId in recipientIdsToLookup) {
if ([registeredRecipientIds containsObject:recipientId]) {
SignalRecipient *recipient =
[SignalRecipient markRecipientAsRegisteredAndGet:recipientId transaction:transaction];
[recipients addObject:recipient];
} else {
[SignalRecipient removeUnregisteredRecipient:recipientId transaction:transaction];
}
}
}];
OWSContactDiscoveryOperation *operation =
[[OWSContactDiscoveryOperation alloc] initWithRecipientIdsToLookup:recipientIdsToLookup.allObjects];
success([recipients copy]);
}
failure:^(NSURLSessionDataTask *task, NSError *error) {
if (!IsNSErrorNetworkFailure(error)) {
OWSProdError([OWSAnalyticsEvents contactsErrorContactsIntersectionFailed]);
}
NSArray<NSOperation *> *operationAndDependencies = [operation.dependencies arrayByAddingObject:operation];
[self.contactIntersectionQueue addOperations:operationAndDependencies waitUntilFinished:YES];
NSHTTPURLResponse *response = (NSHTTPURLResponse *)task.response;
if (response.statusCode == 413) {
failure(OWSErrorWithCodeDescription(
OWSErrorCodeContactsUpdaterRateLimit, @"Contacts Intersection Rate Limit"));
} else {
failure(error);
}
}];
if (operation.failingError != nil) {
failure(operation.failingError);
return;
}
NSSet<NSString *> *registeredRecipientIds = operation.registeredRecipientIds;
NSMutableSet<SignalRecipient *> *recipients = [NSMutableSet new];
[OWSPrimaryStorage.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
for (NSString *recipientId in recipientIdsToLookup) {
if ([registeredRecipientIds containsObject:recipientId]) {
SignalRecipient *recipient =
[SignalRecipient markRecipientAsRegisteredAndGet:recipientId transaction:transaction];
[recipients addObject:recipient];
} else {
[SignalRecipient removeUnregisteredRecipient:recipientId transaction:transaction];
}
}
}];
success([recipients copy]);
});
}

View File

@ -0,0 +1,281 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
//
import Foundation
@objc(OWSContactDiscoveryOperation)
class ContactDiscoveryOperation: OWSOperation, LegacyContactDiscoveryBatchOperationDelegate {
let batchSize = 2048
let recipientIdsToLookup: [String]
@objc
var registeredRecipientIds: Set<String>
@objc
required init(recipientIdsToLookup: [String]) {
self.recipientIdsToLookup = recipientIdsToLookup
self.registeredRecipientIds = Set()
super.init()
Logger.debug("\(logTag) in \(#function) with recipientIdsToLookup: \(recipientIdsToLookup.count)")
for batchIds in recipientIdsToLookup.chunked(by: batchSize) {
let batchOperation = LegacyContactDiscoveryBatchOperation(recipientIdsToLookup: batchIds)
batchOperation.delegate = self
self.addDependency(batchOperation)
}
}
// MARK: Mandatory overrides
// Called every retry, this is where the bulk of the operation's work should go.
override func run() {
Logger.debug("\(logTag) in \(#function)")
for dependency in self.dependencies {
guard let batchOperation = dependency as? LegacyContactDiscoveryBatchOperation else {
owsFail("\(self.logTag) in \(#function) unexpected dependency: \(dependency)")
continue
}
self.registeredRecipientIds.formUnion(batchOperation.registeredRecipientIds)
}
self.reportSuccess()
}
// MARK: LegacyContactDiscoveryBatchOperationDelegate
func contactDiscoverBatchOperation(_ contactDiscoverBatchOperation: LegacyContactDiscoveryBatchOperation, didFailWithError error: Error) {
Logger.debug("\(logTag) in \(#function) canceling self and all dependencies.")
self.dependencies.forEach { $0.cancel() }
self.cancel()
}
}
protocol LegacyContactDiscoveryBatchOperationDelegate: class {
func contactDiscoverBatchOperation(_ contactDiscoverBatchOperation: LegacyContactDiscoveryBatchOperation, didFailWithError error: Error)
}
class LegacyContactDiscoveryBatchOperation: OWSOperation {
var registeredRecipientIds: Set<String>
weak var delegate: LegacyContactDiscoveryBatchOperationDelegate?
private let recipientIdsToLookup: [String]
private var networkManager: TSNetworkManager {
return TSNetworkManager.shared()
}
// MARK: Initializers
required init(recipientIdsToLookup: [String]) {
self.recipientIdsToLookup = recipientIdsToLookup
self.registeredRecipientIds = Set()
super.init()
Logger.debug("\(logTag) in \(#function) with recipientIdsToLookup: \(recipientIdsToLookup.count)")
}
// MARK: OWSOperation Overrides
// Called every retry, this is where the bulk of the operation's work should go.
override func run() {
Logger.debug("\(logTag) in \(#function)")
guard !isCancelled else {
Logger.info("\(logTag) in \(#function) no work to do, since we were canceled")
self.reportCancelled()
return
}
var phoneNumbersByHashes: [String: String] = [:]
for recipientId in recipientIdsToLookup {
let hash = Cryptography.truncatedSHA1Base64EncodedWithoutPadding(recipientId)
assert(phoneNumbersByHashes[hash] == nil)
phoneNumbersByHashes[hash] = recipientId
}
let hashes: [String] = Array(phoneNumbersByHashes.keys)
let request = OWSRequestFactory.contactsIntersectionRequest(withHashesArray: hashes)
self.networkManager.makeRequest(request,
success: { (task, responseDict) in
do {
self.registeredRecipientIds = try self.parse(response: responseDict, phoneNumbersByHashes: phoneNumbersByHashes)
self.reportSuccess()
} catch {
self.reportError(error)
}
},
failure: { (task, error) in
guard let response = task.response as? HTTPURLResponse else {
let responseError: NSError = OWSErrorMakeUnableToProcessServerResponseError() as NSError
responseError.isRetryable = true
self.reportError(responseError)
return
}
guard response.statusCode != 413 else {
let rateLimitError = OWSErrorWithCodeDescription(OWSErrorCode.contactsUpdaterRateLimit, "Contacts Intersection Rate Limit")
self.reportError(rateLimitError)
return
}
self.reportError(error)
})
}
// Called at most one time.
override func didSucceed() {
// Compare against new CDS service
let newCDSBatchOperation = CDSBatchOperation(recipientIdsToLookup: self.recipientIdsToLookup)
let cdsFeedbackOperation = CDSFeedbackOperation(legacyRegisteredRecipientIds: self.registeredRecipientIds)
cdsFeedbackOperation.addDependency(newCDSBatchOperation)
CDSFeedbackOperation.operationQueue.addOperations([newCDSBatchOperation, cdsFeedbackOperation], waitUntilFinished: false)
}
// Called at most one time.
override func didFail(error: Error) {
self.delegate?.contactDiscoverBatchOperation(self, didFailWithError: error)
}
// MARK: Private Helpers
private func parse(response: Any?, phoneNumbersByHashes: [String: String]) throws -> Set<String> {
guard let responseDict = response as? [String: AnyObject] else {
let responseError: NSError = OWSErrorMakeUnableToProcessServerResponseError() as NSError
responseError.isRetryable = true
throw responseError
}
guard let contactDicts = responseDict["contacts"] as? [[String: AnyObject]] else {
let responseError: NSError = OWSErrorMakeUnableToProcessServerResponseError() as NSError
responseError.isRetryable = true
throw responseError
}
var registeredRecipientIds: Set<String> = Set()
for contactDict in contactDicts {
guard let hash = contactDict["token"] as? String, hash.count > 0 else {
owsFail("\(self.logTag) in \(#function) hash was unexpectedly nil")
continue
}
guard let recipientId = phoneNumbersByHashes[hash], recipientId.count > 0 else {
owsFail("\(self.logTag) in \(#function) recipientId was unexpectedly nil")
continue
}
guard recipientIdsToLookup.contains(recipientId) else {
owsFail("\(self.logTag) in \(#function) unexpected recipientId")
continue
}
registeredRecipientIds.insert(recipientId)
}
return registeredRecipientIds
}
}
class CDSBatchOperation: OWSOperation {
private let recipientIdsToLookup: [String]
var registeredRecipientIds: Set<String>
// MARK: Initializers
required init(recipientIdsToLookup: [String]) {
self.recipientIdsToLookup = recipientIdsToLookup
self.registeredRecipientIds = Set()
super.init()
Logger.debug("\(logTag) in \(#function) with recipientIdsToLookup: \(recipientIdsToLookup.count)")
}
// MARK: OWSOperationOverrides
// Called every retry, this is where the bulk of the operation's work should go.
override func run() {
Logger.debug("\(logTag) in \(#function)")
Logger.debug("\(logTag) in \(#function) FAKING intersection (TODO)")
self.registeredRecipientIds = Set(self.recipientIdsToLookup)
self.reportSuccess()
}
}
class CDSFeedbackOperation: OWSOperation {
static let operationQueue: OperationQueue = {
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
return queue
}()
private let legacyRegisteredRecipientIds: Set<String>
// MARK: Initializers
required init(legacyRegisteredRecipientIds: Set<String>) {
self.legacyRegisteredRecipientIds = legacyRegisteredRecipientIds
super.init()
Logger.debug("\(logTag) in \(#function)")
}
// MARK: OWSOperation Overrides
// Called every retry, this is where the bulk of the operation's work should go.
override func run() {
guard let cdsOperation = dependencies.first as? CDSBatchOperation else {
let error = OWSErrorMakeAssertionError("\(self.logTag) in \(#function) cdsOperation was unexpectedly nil")
self.reportError(error)
return
}
let cdsRegisteredRecipientIds = cdsOperation.registeredRecipientIds
if cdsRegisteredRecipientIds == legacyRegisteredRecipientIds {
Logger.debug("\(logTag) in \(#function) TODO: PUT /v1/directory/feedback/ok")
} else {
Logger.debug("\(logTag) in \(#function) TODO: PUT /v1/directory/feedback/mismatch")
}
self.reportSuccess()
}
override func didFail(error: Error) {
// dependency failed.
// Depending on error, PUT one of:
// /v1/directory/feedback/server-error:
// /v1/directory/feedback/client-error:
// /v1/directory/feedback/attestation-error:
// /v1/directory/feedback/unexpected-error:
Logger.debug("\(logTag) in \(#function) TODO: PUT /v1/directory/feedback/*-error")
}
}
extension Array {
func chunked(by chunkSize: Int) -> [[Element]] {
return stride(from: 0, to: self.count, by: chunkSize).map {
Array(self[$0..<Swift.min($0 + chunkSize, self.count)])
}
}
}

View File

@ -132,21 +132,9 @@ void AssertIsOnSendingQueue()
- (nullable NSError *)checkForPreconditionError
{
for (NSOperation *dependency in self.dependencies) {
if (![dependency isKindOfClass:[OWSOperation class]]) {
NSString *errorDescription =
[NSString stringWithFormat:@"%@ unknown dependency: %@", self.logTag, dependency.class];
NSError *assertionError = OWSErrorMakeAssertionError(errorDescription);
return assertionError;
}
OWSOperation *upload = (OWSOperation *)dependency;
// Cannot proceed if dependency failed - surface the dependency's error.
NSError *_Nullable dependencyError = upload.failingError;
if (dependencyError) {
return dependencyError;
}
NSError *_Nullable error = [super checkForPreconditionError];
if (error) {
return error;
}
// Sanity check preconditions

View File

@ -41,7 +41,7 @@ typedef NS_ENUM(NSUInteger, TSVerificationTransport) { TSVerificationTransportVo
+ (TSRequest *)availablePreKeysCountRequest;
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray *)hashes;
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray<NSString *> *)hashes;
+ (TSRequest *)currentSignedPreKeyRequest;

View File

@ -116,7 +116,7 @@ NS_ASSUME_NONNULL_BEGIN
return [TSRequest requestWithUrl:[NSURL URLWithString:path] method:@"GET" parameters:@{}];
}
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray *)hashes
+ (TSRequest *)contactsIntersectionRequestWithHashesArray:(NSArray<NSString *> *)hashes
{
OWSAssert(hashes.count > 0);

View File

@ -0,0 +1,6 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
//
// ObjC classes from which Swift classes inherit must be included in this framework header.
#import "OWSOperation.h"

View File

@ -24,31 +24,45 @@ typedef NS_ENUM(NSInteger, OWSOperationState) {
// any of the errors were fatal. Fatal errors trump retryable errors.
@interface OWSOperation : NSOperation
@property (nullable) NSError *failingError;
@property (readonly, nullable) NSError *failingError;
// Defaults to 0, set to greater than 0 in init if you'd like the operation to be retryable.
@property NSUInteger remainingRetries;
#pragma mark - Subclass Overrides
// Called one time only
- (nullable NSError *)checkForPreconditionError;
#pragma mark - Mandatory Subclass Overrides
// Called every retry, this is where the bulk of the operation's work should go.
- (void)run;
#pragma mark - Optional Subclass Overrides
// Called one time only
- (nullable NSError *)checkForPreconditionError;
// Called at most one time.
- (void)didSucceed;
// Called at most one time.
- (void)didCancel;
// Called at most one time, once retry is no longer possible.
- (void)didFailWithError:(NSError *)error;
- (void)didFailWithError:(NSError *)error NS_SWIFT_NAME(didFail(error:));
#pragma mark - Success/Error - Do Not Override
// Complete the operation successfully.
// Should be called at most once per operation instance.
// You must ensure that `run` cannot fail after calling `reportSuccess`.
// Report that the operation completed successfully.
//
// Each invocation of `run` must make exactly one call to one of: `reportSuccess`, `reportCancelled`, or `reportError:`
- (void)reportSuccess;
// Should be called at most once per `run`.
// Call this when you abort before completion due to being cancelled.
//
// Each invocation of `run` must make exactly one call to one of: `reportSuccess`, `reportCancelled`, or `reportError:`
- (void)reportCancelled;
// Report that the operation failed to complete due to an error.
//
// Each invocation of `run` must make exactly one call to one of: `reportSuccess`, `reportCancelled`, or `reportError:`
// You must ensure that `run` cannot succeed after calling `reportError`, e.g. generally you'll write something like
// this:
//

View File

@ -5,6 +5,7 @@
#import "OWSOperation.h"
#import "NSError+MessageSending.h"
#import "OWSBackgroundTask.h"
#import "OWSError.h"
NS_ASSUME_NONNULL_BEGIN
@ -13,6 +14,7 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
@interface OWSOperation ()
@property (nullable) NSError *failingError;
@property (atomic) OWSOperationState operationState;
@property (nonatomic) OWSBackgroundTask *backgroundTask;
@ -46,8 +48,23 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
// Called one time only
- (nullable NSError *)checkForPreconditionError
{
// no-op
// Override in subclass if necessary
for (NSOperation *dependency in self.dependencies) {
if (![dependency isKindOfClass:[OWSOperation class]]) {
NSString *errorDescription =
[NSString stringWithFormat:@"%@ unknown dependency: %@", self.logTag, dependency.class];
return OWSErrorMakeAssertionError(errorDescription);
}
OWSOperation *dependentOperation = (OWSOperation *)dependency;
// Don't proceed if dependency failed - surface the dependency's error.
NSError *_Nullable dependencyError = dependentOperation.failingError;
if (dependencyError != nil) {
return dependencyError;
}
}
return nil;
}
@ -64,6 +81,13 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
// Override in subclass if necessary
}
// Called at most one time.
- (void)didCancel
{
// no-op
// Override in subclass if necessary
}
// Called at most one time, once retry is no longer possible.
- (void)didFailWithError:(NSError *)error
{
@ -96,6 +120,14 @@ NSString *const OWSOperationKeyIsFinished = @"isFinished";
[self markAsComplete];
}
// These methods are not intended to be subclassed
- (void)reportCancelled
{
DDLogDebug(@"%@ cancelled.", self.logTag);
[self didCancel];
[self markAsComplete];
}
- (void)reportError:(NSError *)error
{
DDLogDebug(@"%@ reportError: %@, fatal?: %d, retryable?: %d, remainingRetries: %lu",