Use a serial queue for core operations
This commit is contained in:
parent
ac0a20c8a7
commit
2e38b382c2
|
@ -38,7 +38,6 @@ public extension LokiAPI {
|
|||
|
||||
// MARK: Internal API
|
||||
internal static func getRandomSnode() -> Promise<LokiAPITarget> {
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
if randomSnodePool.isEmpty {
|
||||
let target = seedNodePool.randomElement()!
|
||||
let url = "\(target)/json_rpc"
|
||||
|
@ -56,7 +55,7 @@ public extension LokiAPI {
|
|||
]
|
||||
print("[Loki] Populating snode pool using: \(target).")
|
||||
let (promise, seal) = Promise<LokiAPITarget>.pending()
|
||||
let queue = DispatchQueue.global()
|
||||
let queue = workQueue
|
||||
HTTP.execute(.post, url, parameters: parameters).map(on: queue) { json in
|
||||
guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed }
|
||||
randomSnodePool = try Set(rawTargets.flatMap { rawTarget in
|
||||
|
@ -88,8 +87,7 @@ public extension LokiAPI {
|
|||
return Promise<[LokiAPITarget]> { $0.fulfill(cachedSwarm) }
|
||||
} else {
|
||||
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ]
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
return getRandomSnode().then(on: DispatchQueue.global()) { invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 }
|
||||
return getRandomSnode().then(on: workQueue) { invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,7 +97,6 @@ public extension LokiAPI {
|
|||
}
|
||||
|
||||
internal static func getFileServerProxy() -> Promise<LokiAPITarget> {
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
let (promise, seal) = Promise<LokiAPITarget>.pending()
|
||||
func getVersion(for snode: LokiAPITarget) -> Promise<String> {
|
||||
if let version = snodeVersion[snode] {
|
||||
|
@ -107,7 +104,7 @@ public extension LokiAPI {
|
|||
} else {
|
||||
let url = URL(string: "\(snode.address):\(snode.port)/get_stats/v1")!
|
||||
let request = TSRequest(url: url)
|
||||
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { intermediate in
|
||||
return TSNetworkManager.shared().perform(request, withCompletionQueue: workQueue).map(on: workQueue) { intermediate in
|
||||
let rawResponse = intermediate.responseObject
|
||||
guard let json = rawResponse as? JSON, let version = json["version"] as? String else { throw LokiAPIError.missingSnodeVersion }
|
||||
snodeVersion[snode] = version
|
||||
|
@ -115,8 +112,8 @@ public extension LokiAPI {
|
|||
}
|
||||
}
|
||||
}
|
||||
getRandomSnode().then(on: DispatchQueue.global()) { snode -> Promise<LokiAPITarget> in
|
||||
return getVersion(for: snode).then(on: DispatchQueue.global()) { version -> Promise<LokiAPITarget> in
|
||||
getRandomSnode().then(on: workQueue) { snode -> Promise<LokiAPITarget> in
|
||||
return getVersion(for: snode).then(on: workQueue) { version -> Promise<LokiAPITarget> in
|
||||
if version >= "2.0.2" {
|
||||
print("[Loki] Using file server proxy with version number \(version).")
|
||||
return Promise { $0.fulfill(snode) }
|
||||
|
@ -124,12 +121,12 @@ public extension LokiAPI {
|
|||
print("[Loki] Rejecting file server proxy with version number \(version).")
|
||||
return getFileServerProxy()
|
||||
}
|
||||
}.recover(on: DispatchQueue.global()) { _ in
|
||||
}.recover(on: workQueue) { _ in
|
||||
return getFileServerProxy()
|
||||
}
|
||||
}.done(on: DispatchQueue.global()) { snode in
|
||||
}.done(on: workQueue) { snode in
|
||||
seal.fulfill(snode)
|
||||
}.catch(on: DispatchQueue.global()) { error in
|
||||
}.catch(on: workQueue) { error in
|
||||
seal.reject(error)
|
||||
}
|
||||
return promise
|
||||
|
|
|
@ -20,6 +20,8 @@ public final class LokiAPI : NSObject {
|
|||
get { stateQueue.sync { _userHexEncodedPublicKeyCache } }
|
||||
set { stateQueue.sync { _userHexEncodedPublicKeyCache = newValue } }
|
||||
}
|
||||
|
||||
internal static let workQueue = DispatchQueue(label: "LokiAPI.workQueue", qos: .userInitiated)
|
||||
|
||||
/// All service node related errors must be handled on this queue to avoid race conditions maintaining e.g. failure counts.
|
||||
public static let errorHandlingQueue = DispatchQueue(label: "LokiAPI.errorHandlingQueue")
|
||||
|
@ -103,7 +105,7 @@ public final class LokiAPI : NSObject {
|
|||
if useOnionRequests {
|
||||
return OnionRequestAPI.sendOnionRequest(invoking: method, on: target, with: parameters, associatedWith: hexEncodedPublicKey).map { $0 as Any }
|
||||
} else {
|
||||
return TSNetworkManager.shared().perform(request, withCompletionQueue: DispatchQueue.global())
|
||||
return TSNetworkManager.shared().perform(request, withCompletionQueue: workQueue)
|
||||
.map { $0.responseObject }
|
||||
.handlingSnodeErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey)
|
||||
.recoveringNetworkErrorsIfNeeded()
|
||||
|
@ -134,7 +136,6 @@ public final class LokiAPI : NSObject {
|
|||
}
|
||||
|
||||
public static func getDestinations(for hexEncodedPublicKey: String, in transaction: YapDatabaseReadWriteTransaction) -> Promise<[Destination]> {
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
let (promise, seal) = Promise<[Destination]>.pending()
|
||||
func getDestinations(in transaction: YapDatabaseReadTransaction? = nil) {
|
||||
func getDestinationsInternal(in transaction: YapDatabaseReadTransaction) {
|
||||
|
@ -163,10 +164,10 @@ public final class LokiAPI : NSObject {
|
|||
}
|
||||
if timeSinceLastUpdate > deviceLinkUpdateInterval {
|
||||
let masterHexEncodedPublicKey = storage.getMasterHexEncodedPublicKey(for: hexEncodedPublicKey, in: transaction) ?? hexEncodedPublicKey
|
||||
LokiFileServerAPI.getDeviceLinks(associatedWith: masterHexEncodedPublicKey, in: transaction).done(on: DispatchQueue.global()) { _ in
|
||||
LokiFileServerAPI.getDeviceLinks(associatedWith: masterHexEncodedPublicKey, in: transaction).done(on: workQueue) { _ in
|
||||
getDestinations()
|
||||
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()
|
||||
}.catch(on: DispatchQueue.global()) { error in
|
||||
}.catch(on: workQueue) { error in
|
||||
if (error as? LokiDotNetAPI.LokiDotNetAPIError) == LokiDotNetAPI.LokiDotNetAPIError.parsingFailed {
|
||||
// Don't immediately re-fetch in case of failure due to a parsing error
|
||||
lastDeviceLinkUpdate[hexEncodedPublicKey] = Date()
|
||||
|
|
|
@ -46,7 +46,7 @@ public class LokiDotNetAPI : NSObject {
|
|||
if let token = getAuthTokenFromDatabase(for: server, in: transaction) {
|
||||
return Promise.value(token)
|
||||
} else {
|
||||
return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map { token -> String in
|
||||
return requestNewAuthToken(for: server).then(on: LokiAPI.workQueue) { submitAuthToken($0, for: server) }.map { token -> String in
|
||||
setAuthToken(for: server, to: token, in: transaction)
|
||||
return token
|
||||
}
|
||||
|
@ -159,11 +159,11 @@ public class LokiDotNetAPI : NSObject {
|
|||
}
|
||||
}
|
||||
if server == LokiFileServerAPI.server {
|
||||
DispatchQueue.global().async {
|
||||
LokiAPI.workQueue.async {
|
||||
proceed(with: "loki") // Uploads to the Loki File Server shouldn't include any personally identifiable information so use a dummy auth token
|
||||
}
|
||||
} else {
|
||||
getAuthToken(for: server).done(on: DispatchQueue.global()) { token in
|
||||
getAuthToken(for: server).done(on: LokiAPI.workQueue) { token in
|
||||
proceed(with: token)
|
||||
}.catch { error in
|
||||
print("[Loki] Couldn't upload attachment due to error: \(error).")
|
||||
|
@ -179,8 +179,7 @@ public class LokiDotNetAPI : NSObject {
|
|||
let queryParameters = "pubKey=\(userHexEncodedPublicKey)"
|
||||
let url = URL(string: "\(server)/loki/v1/get_challenge?\(queryParameters)")!
|
||||
let request = TSRequest(url: url)
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { rawResponse in
|
||||
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map(on: LokiAPI.workQueue) { rawResponse in
|
||||
guard let json = rawResponse as? JSON, let base64EncodedChallenge = json["cipherText64"] as? String, let base64EncodedServerPublicKey = json["serverPubKey64"] as? String,
|
||||
let challenge = Data(base64Encoded: base64EncodedChallenge), var serverPublicKey = Data(base64Encoded: base64EncodedServerPublicKey) else {
|
||||
throw LokiDotNetAPIError.parsingFailed
|
||||
|
@ -204,8 +203,7 @@ public class LokiDotNetAPI : NSObject {
|
|||
let url = URL(string: "\(server)/loki/v1/submit_challenge")!
|
||||
let parameters = [ "pubKey" : userHexEncodedPublicKey, "token" : token ]
|
||||
let request = TSRequest(url: url, method: "POST", parameters: parameters)
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map { _ in token }
|
||||
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map { _ in token }
|
||||
}
|
||||
|
||||
// MARK: Attachments (Public Obj-C API)
|
||||
|
|
|
@ -26,14 +26,13 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
|
|||
/// Gets the device links associated with the given hex encoded public keys from the
|
||||
/// server and stores and returns the valid ones.
|
||||
public static func getDeviceLinks(associatedWith hexEncodedPublicKeys: Set<String>, in transaction: YapDatabaseReadWriteTransaction? = nil) -> Promise<Set<DeviceLink>> {
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
let hexEncodedPublicKeysDescription = "[ \(hexEncodedPublicKeys.joined(separator: ", ")) ]"
|
||||
print("[Loki] Getting device links for: \(hexEncodedPublicKeysDescription).")
|
||||
return getAuthToken(for: server, in: transaction).then(on: DispatchQueue.global()) { token -> Promise<Set<DeviceLink>> in
|
||||
return getAuthToken(for: server, in: transaction).then(on: LokiAPI.workQueue) { token -> Promise<Set<DeviceLink>> in
|
||||
let queryParameters = "ids=\(hexEncodedPublicKeys.map { "@\($0)" }.joined(separator: ","))&include_user_annotations=1"
|
||||
let url = URL(string: "\(server)/users?\(queryParameters)")!
|
||||
let request = TSRequest(url: url)
|
||||
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: DispatchQueue.global()).map(on: DispatchQueue.global()) { rawResponse -> Set<DeviceLink> in
|
||||
return LokiFileServerProxy(for: server).perform(request, withCompletionQueue: LokiAPI.workQueue).map(on: LokiAPI.workQueue) { rawResponse -> Set<DeviceLink> in
|
||||
guard let json = rawResponse as? JSON, let data = json["data"] as? [JSON] else {
|
||||
print("[Loki] Couldn't parse device links for users: \(hexEncodedPublicKeys) from: \(rawResponse).")
|
||||
throw LokiDotNetAPIError.parsingFailed
|
||||
|
@ -75,7 +74,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
|
|||
return deviceLink
|
||||
}
|
||||
})
|
||||
}.map(on: DispatchQueue.global()) { deviceLinks -> Set<DeviceLink> in
|
||||
}.map(on: LokiAPI.workQueue) { deviceLinks -> Set<DeviceLink> in
|
||||
func setDeviceLinks(in transaction: YapDatabaseReadWriteTransaction) {
|
||||
storage.setDeviceLinks(deviceLinks, in: transaction)
|
||||
}
|
||||
|
|
|
@ -44,13 +44,12 @@ internal class LokiFileServerProxy : LokiHTTPClient {
|
|||
}
|
||||
|
||||
internal func performLokiFileServerNSURLRequest(_ request: NSURLRequest, withCompletionQueue queue: DispatchQueue = DispatchQueue.main) -> LokiAPI.RawResponsePromise {
|
||||
// All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
var headers = getCanonicalHeaders(for: request)
|
||||
return Promise<LokiAPI.RawResponse> { [server = self.server, keyPair = self.keyPair, httpSession = self.httpSession] seal in
|
||||
DispatchQueue.global().async {
|
||||
LokiAPI.workQueue.async {
|
||||
let uncheckedSymmetricKey = try? Curve25519.generateSharedSecret(fromPublicKey: LokiFileServerProxy.fileServerPublicKey, privateKey: keyPair.privateKey)
|
||||
guard let symmetricKey = uncheckedSymmetricKey else { return seal.reject(Error.symmetricKeyGenerationFailed) }
|
||||
LokiAPI.getFileServerProxy().then(on: DispatchQueue.global()) { proxy -> Promise<Any> in
|
||||
LokiAPI.getFileServerProxy().then(on: LokiAPI.workQueue) { proxy -> Promise<Any> in
|
||||
let url = "\(proxy.address):\(proxy.port)/file_proxy"
|
||||
guard let urlAsString = request.url?.absoluteString, let serverURLEndIndex = urlAsString.range(of: server)?.upperBound,
|
||||
serverURLEndIndex < urlAsString.endIndex else { throw Error.endpointParsingFailed }
|
||||
|
@ -103,7 +102,7 @@ internal class LokiFileServerProxy : LokiHTTPClient {
|
|||
}
|
||||
task.resume()
|
||||
return promise
|
||||
}.map(on: DispatchQueue.global()) { rawResponse in
|
||||
}.map(on: LokiAPI.workQueue) { rawResponse in
|
||||
guard let responseAsData = rawResponse as? Data, let responseAsJSON = try? JSONSerialization.jsonObject(with: responseAsData, options: .allowFragments) as? JSON, let base64EncodedCipherText = responseAsJSON["data"] as? String,
|
||||
let meta = responseAsJSON["meta"] as? JSON, let statusCode = meta["code"] as? Int, let cipherText = Data(base64Encoded: base64EncodedCipherText) else {
|
||||
print("[Loki] Received an invalid response.")
|
||||
|
@ -116,9 +115,9 @@ internal class LokiFileServerProxy : LokiHTTPClient {
|
|||
let uncheckedJSON = try? JSONSerialization.jsonObject(with: uncheckedJSONAsData, options: .allowFragments) as? JSON
|
||||
guard let json = uncheckedJSON else { throw HTTPError.networkError(code: -1, response: nil, underlyingError: Error.proxyResponseParsingFailed) }
|
||||
return json
|
||||
}.done(on: DispatchQueue.global()) { rawResponse in
|
||||
}.done(on: LokiAPI.workQueue) { rawResponse in
|
||||
seal.fulfill(rawResponse)
|
||||
}.catch(on: DispatchQueue.global()) { error in
|
||||
}.catch(on: LokiAPI.workQueue) { error in
|
||||
print("[Loki] File server proxy request failed with error: \(error.localizedDescription).")
|
||||
seal.reject(HTTPError.from(error: error) ?? error)
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ internal class LokiHTTPClient {
|
|||
securityPolicy.validatesDomainName = false
|
||||
result.securityPolicy = securityPolicy
|
||||
result.responseSerializer = AFHTTPResponseSerializer()
|
||||
result.completionQueue = DispatchQueue.global() // All of this has to happen on DispatchQueue.global() due to the way OWSMessageManager works
|
||||
result.completionQueue = LokiAPI.workQueue
|
||||
return result
|
||||
}()
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ public final class LokiPoller : NSObject {
|
|||
// randomElement() uses the system's default random generator, which is cryptographically secure
|
||||
let nextSnode = unusedSnodes.randomElement()!
|
||||
usedSnodes.insert(nextSnode)
|
||||
poll(nextSnode, seal: seal).done(on: DispatchQueue.global()) {
|
||||
poll(nextSnode, seal: seal).done(on: LokiAPI.workQueue) {
|
||||
seal.fulfill(())
|
||||
}.catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in
|
||||
print("[Loki] Polling \(nextSnode) failed; dropping it and switching to next snode.")
|
||||
|
@ -73,7 +73,7 @@ public final class LokiPoller : NSObject {
|
|||
}
|
||||
|
||||
private func poll(_ target: LokiAPITarget, seal longTermSeal: Resolver<Void>) -> Promise<Void> {
|
||||
return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise<Void> in
|
||||
return LokiAPI.getRawMessages(from: target, usingLongPolling: false).then(on: LokiAPI.workQueue) { [weak self] rawResponse -> Promise<Void> in
|
||||
guard let strongSelf = self, !strongSelf.hasStopped else { return Promise { $0.fulfill(()) } }
|
||||
let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target)
|
||||
strongSelf.onMessagesReceived(messages)
|
||||
|
|
|
@ -3,11 +3,9 @@ import PromiseKit
|
|||
|
||||
/// See the "Onion Requests" section of [The Session Whitepaper](https://arxiv.org/pdf/2002.04609.pdf) for more information.
|
||||
internal enum OnionRequestAPI {
|
||||
/// - Note: Exposed for testing purposes.
|
||||
internal static let workQueue = DispatchQueue(label: "OnionRequestAPI.workQueue", qos: .userInitiated)
|
||||
/// - Note: Must only be modified from `workQueue`.
|
||||
/// - Note: Must only be modified from `LokiAPI.workQueue`.
|
||||
internal static var guardSnodes: Set<LokiAPITarget> = []
|
||||
/// - Note: Must only be modified from `workQueue`.
|
||||
/// - Note: Must only be modified from `LokiAPI.workQueue`.
|
||||
internal static var paths: Set<Path> = []
|
||||
|
||||
private static var snodePool: Set<LokiAPITarget> {
|
||||
|
@ -79,8 +77,8 @@ internal enum OnionRequestAPI {
|
|||
return Promise<Set<LokiAPITarget>> { $0.fulfill(guardSnodes) }
|
||||
} else {
|
||||
print("[Loki] [Onion Request API] Populating guard snode cache.")
|
||||
return LokiAPI.getRandomSnode().then(on: workQueue) { _ -> Promise<Set<LokiAPITarget>> in // Just used to populate the snode pool
|
||||
var unusedSnodes = snodePool // Sync on workQueue
|
||||
return LokiAPI.getRandomSnode().then(on: LokiAPI.workQueue) { _ -> Promise<Set<LokiAPITarget>> in // Just used to populate the snode pool
|
||||
var unusedSnodes = snodePool // Sync on LokiAPI.workQueue
|
||||
guard unusedSnodes.count >= guardSnodeCount else { throw Error.insufficientSnodes }
|
||||
func getGuardSnode() -> Promise<LokiAPITarget> {
|
||||
// randomElement() uses the system's default random generator, which is cryptographically secure
|
||||
|
@ -88,10 +86,10 @@ internal enum OnionRequestAPI {
|
|||
unusedSnodes.remove(candidate) // All used snodes should be unique
|
||||
print("[Loki] [Onion Request API] Testing guard snode: \(candidate).")
|
||||
// Loop until a reliable guard snode is found
|
||||
return testSnode(candidate).map(on: workQueue) { candidate }.recover(on: workQueue) { _ in getGuardSnode() }
|
||||
return testSnode(candidate).map(on: LokiAPI.workQueue) { candidate }.recover(on: LokiAPI.workQueue) { _ in getGuardSnode() }
|
||||
}
|
||||
let promises = (0..<guardSnodeCount).map { _ in getGuardSnode() }
|
||||
return when(fulfilled: promises).map(on: workQueue) { guardSnodes in
|
||||
return when(fulfilled: promises).map(on: LokiAPI.workQueue) { guardSnodes in
|
||||
let guardSnodesAsSet = Set(guardSnodes)
|
||||
OnionRequestAPI.guardSnodes = guardSnodesAsSet
|
||||
return guardSnodesAsSet
|
||||
|
@ -104,8 +102,8 @@ internal enum OnionRequestAPI {
|
|||
/// if not enough (reliable) snodes are available.
|
||||
private static func buildPaths() -> Promise<Set<Path>> {
|
||||
print("[Loki] [Onion Request API] Building onion request paths.")
|
||||
return LokiAPI.getRandomSnode().then(on: workQueue) { _ -> Promise<Set<Path>> in // Just used to populate the snode pool
|
||||
return getGuardSnodes().map(on: workQueue) { guardSnodes in
|
||||
return LokiAPI.getRandomSnode().then(on: LokiAPI.workQueue) { _ -> Promise<Set<Path>> in // Just used to populate the snode pool
|
||||
return getGuardSnodes().map(on: LokiAPI.workQueue) { guardSnodes in
|
||||
var unusedSnodes = snodePool.subtracting(guardSnodes)
|
||||
let pathSnodeCount = guardSnodeCount * pathSize - guardSnodeCount
|
||||
guard unusedSnodes.count >= pathSnodeCount else { throw Error.insufficientSnodes }
|
||||
|
@ -135,7 +133,7 @@ internal enum OnionRequestAPI {
|
|||
seal.fulfill(paths.filter { !$0.contains(snode) }.randomElement()!)
|
||||
}
|
||||
} else {
|
||||
return buildPaths().map(on: workQueue) { paths in
|
||||
return buildPaths().map(on: LokiAPI.workQueue) { paths in
|
||||
let path = paths.filter { !$0.contains(snode) }.randomElement()!
|
||||
OnionRequestAPI.paths = paths
|
||||
return path
|
||||
|
@ -152,10 +150,10 @@ internal enum OnionRequestAPI {
|
|||
var guardSnode: LokiAPITarget!
|
||||
var targetSnodeSymmetricKey: Data! // Needed by invoke(_:on:with:) to decrypt the response sent back by the target snode
|
||||
var encryptionResult: EncryptionResult!
|
||||
return getPath(excluding: snode).then(on: workQueue) { path -> Promise<EncryptionResult> in
|
||||
return getPath(excluding: snode).then(on: LokiAPI.workQueue) { path -> Promise<EncryptionResult> in
|
||||
guardSnode = path.first!
|
||||
// Encrypt in reverse order, i.e. the target snode first
|
||||
return encrypt(payload, forTargetSnode: snode).then(on: workQueue) { r -> Promise<EncryptionResult> in
|
||||
return encrypt(payload, forTargetSnode: snode).then(on: LokiAPI.workQueue) { r -> Promise<EncryptionResult> in
|
||||
targetSnodeSymmetricKey = r.symmetricKey
|
||||
// Recursively encrypt the layers of the onion (again in reverse order)
|
||||
encryptionResult = r
|
||||
|
@ -166,7 +164,7 @@ internal enum OnionRequestAPI {
|
|||
return Promise<EncryptionResult> { $0.fulfill(encryptionResult) }
|
||||
} else {
|
||||
let lhs = path.removeLast()
|
||||
return OnionRequestAPI.encryptHop(from: lhs, to: rhs, using: encryptionResult).then(on: workQueue) { r -> Promise<EncryptionResult> in
|
||||
return OnionRequestAPI.encryptHop(from: lhs, to: rhs, using: encryptionResult).then(on: LokiAPI.workQueue) { r -> Promise<EncryptionResult> in
|
||||
encryptionResult = r
|
||||
rhs = lhs
|
||||
return addLayer()
|
||||
|
@ -175,7 +173,7 @@ internal enum OnionRequestAPI {
|
|||
}
|
||||
return addLayer()
|
||||
}
|
||||
}.map(on: workQueue) { _ in (guardSnode, encryptionResult, targetSnodeSymmetricKey) }
|
||||
}.map(on: LokiAPI.workQueue) { _ in (guardSnode, encryptionResult, targetSnodeSymmetricKey) }
|
||||
}
|
||||
|
||||
// MARK: Internal API
|
||||
|
@ -183,9 +181,9 @@ internal enum OnionRequestAPI {
|
|||
internal static func sendOnionRequest(invoking method: LokiAPITarget.Method, on snode: LokiAPITarget, with parameters: JSON, associatedWith hexEncodedPublicKey: String) -> Promise<JSON> {
|
||||
let (promise, seal) = Promise<JSON>.pending()
|
||||
var guardSnode: LokiAPITarget!
|
||||
workQueue.async {
|
||||
LokiAPI.workQueue.async {
|
||||
let payload: JSON = [ "method" : method.rawValue, "params" : parameters ]
|
||||
buildOnion(around: payload, targetedAt: snode).done(on: workQueue) { intermediate in
|
||||
buildOnion(around: payload, targetedAt: snode).done(on: LokiAPI.workQueue) { intermediate in
|
||||
guardSnode = intermediate.guardSnode
|
||||
let url = "\(guardSnode.address):\(guardSnode.port)/onion_req"
|
||||
let finalEncryptionResult = intermediate.finalEncryptionResult
|
||||
|
@ -195,7 +193,7 @@ internal enum OnionRequestAPI {
|
|||
"ephemeral_key" : finalEncryptionResult.ephemeralPublicKey.toHexString()
|
||||
]
|
||||
let targetSnodeSymmetricKey = intermediate.targetSnodeSymmetricKey
|
||||
HTTP.execute(.post, url, parameters: parameters).done(on: workQueue) { rawResponse in
|
||||
HTTP.execute(.post, url, parameters: parameters).done(on: LokiAPI.workQueue) { rawResponse in
|
||||
guard let json = rawResponse as? JSON, let base64EncodedIVAndCiphertext = json["result"] as? String,
|
||||
let ivAndCiphertext = Data(base64Encoded: base64EncodedIVAndCiphertext) else { return seal.reject(HTTP.Error.invalidJSON) }
|
||||
let iv = ivAndCiphertext[0..<Int(ivSize)]
|
||||
|
@ -213,14 +211,14 @@ internal enum OnionRequestAPI {
|
|||
} catch (let error) {
|
||||
seal.reject(error)
|
||||
}
|
||||
}.catch(on: workQueue) { error in
|
||||
}.catch(on: LokiAPI.workQueue) { error in
|
||||
seal.reject(error)
|
||||
}
|
||||
}.catch(on: workQueue) { error in
|
||||
}.catch(on: LokiAPI.workQueue) { error in
|
||||
seal.reject(error)
|
||||
}
|
||||
}
|
||||
promise.catch(on: workQueue) { error in // Must be invoked on workQueue
|
||||
promise.catch(on: LokiAPI.workQueue) { error in // Must be invoked on LokiAPI.workQueue
|
||||
guard case HTTP.Error.httpRequestFailed(_, _) = error else { return }
|
||||
dropPath(containing: guardSnode) // A snode in the path is bad; retry with a different path
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue