Merge pull request #186 from loki-project/threading

Fix Device Link Fetching Timeout
This commit is contained in:
Niels Andriesse 2020-05-06 16:33:39 +10:00 committed by GitHub
commit 3b4860cf7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 26 additions and 30 deletions

View File

@ -55,8 +55,8 @@ public extension LokiAPI {
]
print("[Loki] Populating snode pool using: \(target).")
let (promise, seal) = Promise<LokiAPITarget>.pending()
attempt(maxRetryCount: 4, recoveringOn: workQueue) {
HTTP.execute(.post, url, parameters: parameters).map(on: workQueue) { json in
attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.global()) {
HTTP.execute(.post, url, parameters: parameters).map(on: DispatchQueue.global()) { json in
guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed }
randomSnodePool = try Set(rawTargets.flatMap { rawTarget in
guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else {
@ -68,9 +68,9 @@ public extension LokiAPI {
// randomElement() uses the system's default random generator, which is cryptographically secure
return randomSnodePool.randomElement()!
}
}.done(on: workQueue) { snode in
}.done(on: DispatchQueue.global()) { snode in
seal.fulfill(snode)
}.catch(on: workQueue) { error in
}.catch(on: DispatchQueue.global()) { error in
print("[Loki] Failed to contact seed node at: \(target).")
seal.reject(error)
}
@ -105,7 +105,7 @@ public extension LokiAPI {
} else {
let url = URL(string: "\(snode.address):\(snode.port)/get_stats/v1")!
let request = TSRequest(url: url)
return TSNetworkManager.shared().perform(request, withCompletionQueue: workQueue).map(on: workQueue) { intermediate in
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { intermediate in
let rawResponse = intermediate.responseObject
guard let json = rawResponse as? JSON, let version = json["version"] as? String else { throw LokiAPIError.missingSnodeVersion }
snodeVersion[snode] = version
@ -113,8 +113,8 @@ public extension LokiAPI {
}
}
}
getRandomSnode().then(on: workQueue) { snode -> Promise<LokiAPITarget> in
return getVersion(for: snode).then(on: workQueue) { version -> Promise<LokiAPITarget> in
getRandomSnode().then(on: DispatchQueue.global()) { snode -> Promise<LokiAPITarget> in
return getVersion(for: snode).then(on: DispatchQueue.global()) { version -> Promise<LokiAPITarget> in
if version >= "2.0.2" {
print("[Loki] Using file server proxy with version number \(version).")
return Promise { $0.fulfill(snode) }
@ -122,12 +122,12 @@ public extension LokiAPI {
print("[Loki] Rejecting file server proxy with version number \(version).")
return getFileServerProxy()
}
}.recover(on: workQueue) { _ in
}.recover(on: DispatchQueue.global()) { _ in
return getFileServerProxy()
}
}.done(on: workQueue) { snode in
}.done(on: DispatchQueue.global()) { snode in
seal.fulfill(snode)
}.catch(on: workQueue) { error in
}.catch(on: DispatchQueue.global()) { error in
seal.reject(error)
}
return promise

View File

@ -38,16 +38,14 @@ public class LokiDotNetAPI : NSObject {
if let token = getAuthTokenFromDatabase(for: server) {
return Promise.value(token)
} else {
return requestNewAuthToken(for: server).then(on: LokiAPI.workQueue) { submitAuthToken($0, for: server) }.then(on: LokiAPI.workQueue) { token -> Promise<String> in
let (promise, seal) = Promise<String>.pending()
return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global()) { token in
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
setAuthToken(for: server, to: token, in: transaction)
}
seal.fulfill(token)
}
return promise
return token
}
}
}
@ -89,7 +87,7 @@ public class LokiDotNetAPI : NSObject {
let url = URL(string: "\(server)/loki/v1/submit_challenge")!
let parameters = [ "pubKey" : userHexEncodedPublicKey, "token" : token ]
let request = TSRequest(url: url, method: "POST", parameters: parameters)
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map { _ in token }
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { _ in token }
}
// MARK: Public API

View File

@ -38,11 +38,11 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
public static func getDeviceLinks(associatedWith hexEncodedPublicKeys: Set<String>) -> Promise<Set<DeviceLink>> {
let hexEncodedPublicKeysDescription = "[ \(hexEncodedPublicKeys.joined(separator: ", ")) ]"
print("[Loki] Getting device links for: \(hexEncodedPublicKeysDescription).")
return getAuthToken(for: server).then(on: LokiAPI.workQueue) { token -> Promise<Set<DeviceLink>> in
return getAuthToken(for: server).then(on: DispatchQueue.global()) { token -> Promise<Set<DeviceLink>> in
let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1"
let url = URL(string: "\(server)/users?\(queryParameters)")!
let request = TSRequest(url: url)
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map(on: LokiAPI.workQueue) { rawResponse -> Set<DeviceLink> in
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global(qos: .userInitiated)) { rawResponse -> Set<DeviceLink> in
guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else {
print("[Loki] Couldn't parse device links for users: \(hexEncodedPublicKeys) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed
@ -84,16 +84,14 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
return deviceLink
}
})
}.then(on: LokiAPI.workQueue) { deviceLinks -> Promise<Set<DeviceLink>> in
let (promise, seal) = Promise<Set<DeviceLink>>.pending()
}.map(on: DispatchQueue.global()) { deviceLinks in
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setDeviceLinks(deviceLinks, in: transaction)
}
seal.fulfill(deviceLinks)
}
return promise
return deviceLinks
}
}
}

View File

@ -46,10 +46,10 @@ internal class LokiFileServerProxy : LokiHTTPClient {
internal func performLokiFileServerNSURLRequest(_ request: NSURLRequest, withCompletionQueue queue: DispatchQueue = DispatchQueue.main) -> LokiAPI.RawResponsePromise {
var headers = getCanonicalHeaders(for: request)
return Promise<LokiAPI.RawResponse> { [server = self.server, keyPair = self.keyPair, httpSession = self.httpSession] seal in
LokiAPI.workQueue.async {
DispatchQueue.global(qos: .userInitiated).async {
let uncheckedSymmetricKey = try? Curve25519.generateSharedSecret(fromPublicKey: LokiFileServerProxy.fileServerPublicKey, privateKey: keyPair.privateKey)
guard let symmetricKey = uncheckedSymmetricKey else { return seal.reject(Error.symmetricKeyGenerationFailed) }
LokiAPI.getFileServerProxy().then(on: LokiAPI.workQueue) { proxy -> Promise<Any> in
LokiAPI.getFileServerProxy().then(on: DispatchQueue.global()) { proxy -> Promise<Any> in
let url = "\(proxy.address):\(proxy.port)/file_proxy"
guard let urlAsString = request.url?.absoluteString, let serverURLEndIndex = urlAsString.range(of: server)?.upperBound,
serverURLEndIndex < urlAsString.endIndex else { throw Error.endpointParsingFailed }
@ -102,7 +102,7 @@ internal class LokiFileServerProxy : LokiHTTPClient {
}
task.resume()
return promise
}.map(on: LokiAPI.workQueue) { rawResponse in
}.map(on: DispatchQueue.global(qos: .userInitiated)) { rawResponse in
guard let responseAsData = rawResponse as? Data, let responseAsJSON = try? JSONSerialization.jsonObject(with: responseAsData, options: .allowFragments) as? JSON, let base64EncodedCipherText = responseAsJSON["data"] as? String,
let meta = responseAsJSON["meta"] as? JSON, let statusCode = meta["code"] as? Int, let cipherText = Data(base64Encoded: base64EncodedCipherText) else {
print("[Loki] Received an invalid response.")
@ -115,9 +115,9 @@ internal class LokiFileServerProxy : LokiHTTPClient {
let uncheckedJSON = try? JSONSerialization.jsonObject(with: uncheckedJSONAsData, options: .allowFragments) as? JSON
guard let json = uncheckedJSON else { throw HTTPError.networkError(code: -1, response: nil, underlyingError: Error.proxyResponseParsingFailed) }
return json
}.done(on: LokiAPI.workQueue) { rawResponse in
}.done(on: DispatchQueue.global()) { rawResponse in
seal.fulfill(rawResponse)
}.catch(on: LokiAPI.workQueue) { error in
}.catch(on: DispatchQueue.global()) { error in
print("[Loki] File server proxy request failed with error: \(error.localizedDescription).")
seal.reject(HTTPError.from(error: error) ?? error)
}

View File

@ -10,7 +10,7 @@ internal class LokiHTTPClient {
securityPolicy.validatesDomainName = false
result.securityPolicy = securityPolicy
result.responseSerializer = AFHTTPResponseSerializer()
result.completionQueue = LokiAPI.workQueue
result.completionQueue = DispatchQueue.global()
return result
}()

View File

@ -295,10 +295,10 @@ public extension MultiDeviceProtocol {
}
if timeSinceLastUpdate > deviceLinkUpdateInterval {
let masterHexEncodedPublicKey = storage.getMasterHexEncodedPublicKey(for: hexEncodedPublicKey, in: transaction) ?? hexEncodedPublicKey
LokiFileServerAPI.getDeviceLinks(associatedWith: masterHexEncodedPublicKey).done(on: LokiAPI.workQueue) { _ in
LokiFileServerAPI.getDeviceLinks(associatedWith: masterHexEncodedPublicKey).done(on: DispatchQueue.global()) { _ in
getDestinations()
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()
}.catch(on: LokiAPI.workQueue) { error in
}.catch(on: DispatchQueue.global()) { error in
if (error as? LokiDotNetAPI.LokiDotNetAPIError) == LokiDotNetAPI.LokiDotNetAPIError.parsingFailed {
// Don't immediately re-fetch in case of failure due to a parsing error
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()

View File

@ -1271,7 +1271,7 @@ NS_ASSUME_NONNULL_BEGIN
}).catchOn(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(NSError *error) {
dispatch_semaphore_signal(semaphore);
}) retainUntilComplete];
dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, 4 * NSEC_PER_SEC));
dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, 8 * NSEC_PER_SEC));
}
// FIXME: ========