Merge pull request #195 from loki-project/deadlock-fix

Fix Deadlock (Modified)
This commit is contained in:
Niels Andriesse 2020-05-20 11:54:11 +10:00 committed by GitHub
commit 9ae5be8eb9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 53 additions and 13 deletions

View file

@ -126,6 +126,9 @@
<Test
Identifier = "OWSSignalAddressTest">
</Test>
<Test
Identifier = "OWSUDManagerTest">
</Test>
<Test
Identifier = "PhoneNumberTest">
</Test>

View file

@ -84,18 +84,17 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
return deviceLink
}
})
}.then(on: DispatchQueue.global()) { deviceLinks -> Promise<Set<DeviceLink>> in
let (promise, seal) = Promise<Set<DeviceLink>>.pending()
}.map(on: DispatchQueue.global()) { deviceLinks in
storage.cacheDeviceLinks(deviceLinks)
/*
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setDeviceLinks(deviceLinks, in: transaction)
}
// We have to wait for the device links to be stored because a lot of our logic relies
// on them being in the database
seal.fulfill(deviceLinks)
}
return promise
*/
return deviceLinks
}
}
}

View file

@ -1,9 +1,27 @@
public extension OWSPrimaryStorage {
// MARK: Session Requests
private static let sessionRequestTimestampCollection = "LokiSessionRequestTimestampCollection"
public func setSessionRequestTimestamp(for publicKey: String, to timestamp: Date, in transaction: YapDatabaseReadWriteTransaction) {
transaction.setDate(timestamp, forKey: publicKey, inCollection: OWSPrimaryStorage.sessionRequestTimestampCollection)
}
public func getSessionRequestTimestamp(for publicKey: String, in transaction: YapDatabaseReadTransaction) -> Date? {
transaction.date(forKey: publicKey, inCollection: OWSPrimaryStorage.sessionRequestTimestampCollection)
}
// MARK: Multi Device
private static var deviceLinkCache: Set<DeviceLink> = []
private func getDeviceLinkCollection(for masterHexEncodedPublicKey: String) -> String {
return "LokiDeviceLinkCollection-\(masterHexEncodedPublicKey)"
}
public func cacheDeviceLinks(_ deviceLinks: Set<DeviceLink>) {
OWSPrimaryStorage.deviceLinkCache.formUnion(deviceLinks)
}
public func setDeviceLinks(_ deviceLinks: Set<DeviceLink>, in transaction: YapDatabaseReadWriteTransaction) {
// TODO: Clear collections first?
@ -11,16 +29,24 @@ public extension OWSPrimaryStorage {
}
public func addDeviceLink(_ deviceLink: DeviceLink, in transaction: YapDatabaseReadWriteTransaction) {
OWSPrimaryStorage.deviceLinkCache.insert(deviceLink)
/*
let collection = getDeviceLinkCollection(for: deviceLink.master.hexEncodedPublicKey)
transaction.setObject(deviceLink, forKey: deviceLink.slave.hexEncodedPublicKey, inCollection: collection)
*/
}
public func removeDeviceLink(_ deviceLink: DeviceLink, in transaction: YapDatabaseReadWriteTransaction) {
OWSPrimaryStorage.deviceLinkCache.remove(deviceLink)
/*
let collection = getDeviceLinkCollection(for: deviceLink.master.hexEncodedPublicKey)
transaction.removeObject(forKey: deviceLink.slave.hexEncodedPublicKey, inCollection: collection)
*/
}
public func getDeviceLinks(for masterHexEncodedPublicKey: String, in transaction: YapDatabaseReadTransaction) -> Set<DeviceLink> {
return OWSPrimaryStorage.deviceLinkCache.filter { $0.master.hexEncodedPublicKey == masterHexEncodedPublicKey }
/*
let collection = getDeviceLinkCollection(for: masterHexEncodedPublicKey)
guard !transaction.allKeys(inCollection: collection).isEmpty else { return [] } // Fixes a crash that used to occur on Josh's device
var result: Set<DeviceLink> = []
@ -29,9 +55,12 @@ public extension OWSPrimaryStorage {
result.insert(deviceLink)
}
return result
*/
}
public func getDeviceLink(for slaveHexEncodedPublicKey: String, in transaction: YapDatabaseReadTransaction) -> DeviceLink? {
return OWSPrimaryStorage.deviceLinkCache.filter { $0.slave.hexEncodedPublicKey == slaveHexEncodedPublicKey }.first
/*
let query = YapDatabaseQuery(string: "WHERE \(DeviceLinkIndex.slaveHexEncodedPublicKey) = ?", parameters: [ slaveHexEncodedPublicKey ])
let deviceLinks = DeviceLinkIndex.getDeviceLinks(for: query, in: transaction)
guard deviceLinks.count <= 1 else {
@ -39,12 +68,14 @@ public extension OWSPrimaryStorage {
return nil
}
return deviceLinks.first
*/
}
public func getMasterHexEncodedPublicKey(for slaveHexEncodedPublicKey: String, in transaction: YapDatabaseReadTransaction) -> String? {
return getDeviceLink(for: slaveHexEncodedPublicKey, in: transaction)?.master.hexEncodedPublicKey
}
// MARK: Open Groups
public func getUserCount(for publicChat: LokiPublicChat, in transaction: YapDatabaseReadTransaction) -> Int? {
return transaction.object(forKey: publicChat.id, inCollection: "LokiPublicChatUserCountCollection") as? Int
}

View file

@ -41,9 +41,10 @@ public final class ClosedGroupsProtocol : NSObject {
let thread = TSContactThread.getOrCreateThread(withContactId: hexEncodedPublicKey, transaction: transaction)
thread.save(with: transaction)
let sessionRequestMessage = SessionRequestMessage(thread: thread)
storage.setSessionRequestTimestamp(for: hexEncodedPublicKey, to: Date(), in: transaction)
let messageSenderJobQueue = SSKEnvironment.shared.messageSenderJobQueue
// This has to happen sync to ensure that session requests get sent before AFRs do (it's
// asssumed that the master device first syncs closed groups first and contacts after that).
// asssumed that the master device syncs closed groups first and contacts after that).
messageSenderJobQueue.add(message: sessionRequestMessage, transaction: transaction)
}
}

View file

@ -55,7 +55,6 @@ public final class SessionMetaProtocol : NSObject {
}
// MARK: Note to Self
@objc(isThreadNoteToSelf:)
public static func isThreadNoteToSelf(_ thread: TSThread) -> Bool {
guard let thread = thread as? TSContactThread else { return false }

View file

@ -24,7 +24,7 @@ public final class MultiDeviceProtocol : NSObject {
internal static var storage: OWSPrimaryStorage { OWSPrimaryStorage.shared() }
// MARK: - Settings
public static let deviceLinkUpdateInterval: TimeInterval = 20
public static let deviceLinkUpdateInterval: TimeInterval = 60
// MARK: - Multi Device Destination
public struct MultiDeviceDestination : Hashable {

View file

@ -145,6 +145,7 @@ public final class SessionManagementProtocol : NSObject {
storage.dbReadWriteConnection.readWrite { transaction in
let thread = TSContactThread.getOrCreateThread(withContactId: hexEncodedPublicKey, transaction: transaction)
let sessionRequestMessage = SessionRequestMessage(thread: thread)
storage.setSessionRequestTimestamp(for: hexEncodedPublicKey, to: Date(), in: transaction)
let messageSenderJobQueue = SSKEnvironment.shared.messageSenderJobQueue
messageSenderJobQueue.add(message: sessionRequestMessage, transaction: transaction)
}
@ -192,6 +193,11 @@ public final class SessionManagementProtocol : NSObject {
public static func handleSessionRequestMessage(_ dataMessage: SSKProtoDataMessage, wrappedIn envelope: SSKProtoEnvelope, using transaction: YapDatabaseReadWriteTransaction) {
// The envelope source is set during UD decryption
let hexEncodedPublicKey = envelope.source!
if let sentSessionRequestTimestamp = storage.getSessionRequestTimestamp(for: hexEncodedPublicKey, in: transaction),
envelope.timestamp < NSDate.ows_millisecondsSince1970(for: sentSessionRequestTimestamp) {
// We sent a session request after this one was sent
return
}
var closedGroupMembers: Set<String> = []
TSGroupThread.enumerateCollectionObjects(with: transaction) { object, _ in
guard let group = object as? TSGroupThread, group.groupModel.groupType == .closedGroup,

View file

@ -1269,9 +1269,10 @@ NS_ASSUME_NONNULL_BEGIN
// The envelope source is set during UD decryption
if ([ECKeyPair isValidHexEncodedPublicKeyWithCandidate:envelope.source] && dataMessage.publicChatInfo == nil) { // Handled in LokiPublicChatPoller for open group messages
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
[[LKMultiDeviceProtocol updateDeviceLinksIfNeededForHexEncodedPublicKey:envelope.source in:transaction].ensureOn(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^() {
dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
[[LKMultiDeviceProtocol updateDeviceLinksIfNeededForHexEncodedPublicKey:envelope.source in:transaction].ensureOn(queue, ^() {
dispatch_semaphore_signal(semaphore);
}).catchOn(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(NSError *error) {
}).catchOn(queue, ^(NSError *error) {
dispatch_semaphore_signal(semaphore);
}) retainUntilComplete];
dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, 10 * NSEC_PER_SEC));