Implement swarm specific error handling

This commit is contained in:
Niels Andriesse 2019-05-22 16:54:57 +10:00
parent 5c481c38ba
commit 8a9da51943
2 changed files with 33 additions and 22 deletions

View file

@ -7,7 +7,7 @@ extension LokiAPI {
private static let defaultSnodePort: UInt16 = 8080
// MARK: Caching
private static var swarmCache: [String:[Target]] = [:]
fileprivate static var swarmCache: [String:[Target]] = [:] // TODO: Persist on disk
// MARK: Internal API
private static func getRandomSnode() -> Promise<Target> {
@ -21,7 +21,7 @@ extension LokiAPI {
return Promise<[Target]> { $0.fulfill(cachedSwarm) }
} else {
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ]
return getRandomSnode().then { invoke(.getSwarm, on: $0, with: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 }
return getRandomSnode().then { invoke(.getSwarm, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { parseTargets(from: $0) }.get { swarmCache[hexEncodedPublicKey] = $0 }
}
}
@ -44,3 +44,27 @@ extension LokiAPI {
// return addresses.map { Target(address: $0, port: defaultSnodePort) }
}
}
internal extension Promise {
func handlingSwarmSpecificErrorsIfNeeded(for target: LokiAPI.Target, associatedWith hexEncodedPublicKey: String) -> Promise<T> {
return recover { error -> Promise<T> in
if let error = error as? NetworkManagerError {
switch error.statusCode {
case 0:
// The snode is unreachable; usually a problem with LokiNet
Logger.warn("[Loki] There appears to be a problem with LokiNet.")
case 421:
// The snode isn't associated with the given public key anymore
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
}
default: break
}
}
throw error
}
}
}

View file

@ -13,9 +13,9 @@ import PromiseKit
let port: UInt16
enum Method : String {
/// Only applicable to snode targets.
/// Only supported by snode targets.
case getSwarm = "get_snodes_for_pubkey"
/// Only applicable to snode targets.
/// Only supported by snode targets.
case getMessages = "retrieve"
case sendMessage = "store"
}
@ -38,10 +38,10 @@ import PromiseKit
override private init() { }
// MARK: Internal API
internal static func invoke(_ method: Target.Method, on target: Target, with parameters: [String:Any] = [:]) -> Promise<RawResponse> {
internal static func invoke(_ method: Target.Method, on target: Target, associatedWith hexEncodedPublicKey: String, parameters: [String:Any] = [:]) -> Promise<RawResponse> {
let url = URL(string: "\(target.address):\(target.port)/\(version)/storage_rpc")!
let request = TSRequest(url: url, method: "POST", parameters: [ "method" : method.rawValue, "params" : parameters ])
return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }
return TSNetworkManager.shared().makePromise(request: request).map { $0.responseObject }.handlingSwarmSpecificErrorsIfNeeded(for: target, associatedWith: hexEncodedPublicKey)
}
// MARK: Public API
@ -50,7 +50,7 @@ import PromiseKit
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { targetSnode in
let lastHash = getLastMessageHashValue(for: targetSnode) ?? ""
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey, "lastHash" : lastHash ]
return invoke(.getMessages, on: targetSnode, with: parameters).map { rawResponse in
return invoke(.getMessages, on: targetSnode, associatedWith: hexEncodedPublicKey, parameters: parameters).map { rawResponse in
guard let json = rawResponse as? JSON, let rawMessages = json["messages"] as? [JSON] else { return [] }
updateLastMessageHashValueIfPossible(for: targetSnode, from: rawMessages)
let newRawMessages = removeDuplicates(from: rawMessages)
@ -70,7 +70,7 @@ import PromiseKit
// TODO: Send using P2P protocol
} else {
let parameters = lokiMessage.toJSON()
return getTargetSnodes(for: lokiMessage.destination).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) }
return getTargetSnodes(for: lokiMessage.destination).mapValues { invoke(.sendMessage, on: $0, associatedWith: lokiMessage.destination, parameters: parameters) }.map { Set($0) }
}
}
@ -80,7 +80,7 @@ import PromiseKit
// TODO: Send using P2P protocol
} else {
let parameters: [String:Any] = [ "pubKey" : hexEncodedPublicKey ] // TODO: Figure out correct parameters
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, with: parameters).recoverNetworkErrorIfNeeded(on: DispatchQueue.global()) }.map { Set($0) }
return getTargetSnodes(for: hexEncodedPublicKey).mapValues { invoke(.sendMessage, on: $0, associatedWith: hexEncodedPublicKey, parameters: parameters) }.map { Set($0) }
}
}
@ -169,16 +169,3 @@ private extension AnyPromise {
return result
}
}
// MARK: Error Handling
private extension Promise {
func recoverNetworkErrorIfNeeded(on queue: DispatchQueue) -> Promise<T> {
return recover(on: queue) { error -> Promise<T> in
switch error {
case NetworkManagerError.taskError(_, let underlyingError): throw underlyingError
default: throw error
}
}
}
}