This commit is contained in:
nielsandriesse 2020-06-03 09:03:29 +10:00
parent 887eaf3ada
commit ab021d0e5e
4 changed files with 54 additions and 61 deletions

View File

@ -3,8 +3,12 @@ import PromiseKit
public extension LokiAPI {
private static var snodeVersion: [LokiAPITarget:String] = [:]
fileprivate static let seedNodePool: Set<String> = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ]
/// Only ever modified from `LokiAPI.errorHandlingQueue` to avoid race conditions.
internal static var snodeFailureCount: [LokiAPITarget:UInt] = [:]
internal static var snodePool: Set<LokiAPITarget> = []
internal static var swarmCache: [String:[LokiAPITarget]] = [:] // TODO: Make this set based?
// MARK: Settings
private static let minimumSnodePoolCount = 32
@ -13,39 +17,6 @@ public extension LokiAPI {
internal static let snodeFailureThreshold = 2
// MARK: Caching
internal static var swarmCache: [String:[LokiAPITarget]] = [:] // TODO: Make this set based?
internal static func dropSnodeFromSwarmIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) {
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
}
}
}
}
// MARK: Clearnet Setup
fileprivate static let seedNodePool: Set<String> = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ]
internal static var snodePool: Set<LokiAPITarget> = []
@objc public static func clearSnodePool() {
snodePool.removeAll()
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.clearSnodePool(in: transaction)
}
}
}
// MARK: Internal API
internal static func getRandomSnode() -> Promise<LokiAPITarget> {
if snodePool.count < minimumSnodePoolCount {
@ -135,6 +106,32 @@ public extension LokiAPI {
return getSwarm(for: hexEncodedPublicKey).map { Array($0.shuffled().prefix(targetSwarmSnodeCount)) }
}
internal static func dropSnodeFromSnodePool(_ target: LokiAPITarget) {
LokiAPI.snodePool.remove(target)
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.dropSnodeFromSnodePool(target, in: transaction)
}
}
}
internal static func dropSnodeFromSwarmIfNeeded(_ target: LokiAPITarget, hexEncodedPublicKey: String) {
let swarm = LokiAPI.swarmCache[hexEncodedPublicKey]
if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
}
}
}
}
internal static func getFileServerProxy() -> Promise<LokiAPITarget> {
let (promise, seal) = Promise<LokiAPITarget>.pending()
func getVersion(for snode: LokiAPITarget) -> Promise<String> {
@ -170,7 +167,18 @@ public extension LokiAPI {
}
return promise
}
// MARK: Public API
@objc public static func clearSnodePool() {
snodePool.removeAll()
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.clearSnodePool(in: transaction)
}
}
}
// MARK: Parsing
private static func parseTargets(from rawResponse: Any) -> [LokiAPITarget] {
guard let json = rawResponse as? JSON, let rawTargets = json["snodes"] as? [JSON] else {
@ -202,15 +210,8 @@ internal extension Promise {
print("[Loki] Couldn't reach snode at: \(target); setting failure count to \(newFailureCount).")
if newFailureCount >= LokiAPI.snodeFailureThreshold {
print("[Loki] Failure threshold reached for: \(target); dropping it.")
LokiAPI.dropSnodeFromSwarmIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey) // Remove it from the swarm cache associated with the given public key
LokiAPI.snodePool.remove(target) // Remove it from the snode pool
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.dropSnode(target, in: transaction)
}
}
LokiAPI.dropSnodeFromSwarmIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey)
LokiAPI.dropSnodeFromSnodePool(target)
LokiAPI.snodeFailureCount[target] = 0
}
case 406:

View File

@ -1,9 +1,7 @@
import PromiseKit
// TODO: A lot of the API relies on things happening serially and state being maintained correctly (i.e. without
// race conditions). To this end we should just have one high quality serial queue and do everything on there, except
// for things that explicitly *can* be done in parallel and don't modify state, which should then happen
// on a global queue.
// TODO: We guarantee that things happen in-order through promise chaining. For performance we should be able to use different queues for everything as long
// as we always modify state from the same queue.
@objc(LKAPI)
public final class LokiAPI : NSObject {

View File

@ -100,7 +100,7 @@ public enum OnionRequestAPI {
/// Builds and returns `pathCount` paths. The returned promise errors out with `Error.insufficientSnodes`
/// if not enough (reliable) snodes are available.
public static func buildPaths() -> Promise<[Path]> {
private static func buildPaths() -> Promise<[Path]> {
print("[Loki] [Onion Request API] Building onion request paths.")
DispatchQueue.main.async {
NotificationCenter.default.post(name: .buildingPaths, object: nil)
@ -146,6 +146,7 @@ public enum OnionRequestAPI {
let storage = OWSPrimaryStorage.shared()
storage.dbReadConnection.read { transaction in
paths = storage.getOnionRequestPaths(in: transaction)
guardSnodes.formUnion([ paths[0][0], paths[1][0] ])
}
}
// randomElement() uses the system's default random generator, which is cryptographically secure
@ -160,7 +161,7 @@ public enum OnionRequestAPI {
}
}
private static func dropPaths() {
private static func dropAllPaths() {
paths.removeAll()
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
@ -250,7 +251,7 @@ public enum OnionRequestAPI {
}
promise.catch(on: LokiAPI.workQueue) { error in // Must be invoked on LokiAPI.workQueue
guard case HTTP.Error.httpRequestFailed(_, _) = error else { return }
dropPaths() // A snode in the path is bad; retry with a different path
dropAllPaths() // A snode in the path is bad; retry with a different path
dropGuardSnode(guardSnode)
}
promise.handlingErrorsIfNeeded(forTargetSnode: snode, associatedWith: hexEncodedPublicKey)
@ -275,15 +276,8 @@ private extension Promise where T == JSON {
print("[Loki] Couldn't reach snode at: \(snode); setting failure count to \(newFailureCount).")
if newFailureCount >= LokiAPI.snodeFailureThreshold {
print("[Loki] Failure threshold reached for: \(snode); dropping it.")
LokiAPI.dropSnodeFromSwarmIfNeeded(snode, hexEncodedPublicKey: hexEncodedPublicKey) // Remove it from the swarm cache associated with the given public key
LokiAPI.snodePool.remove(snode) // Remove it from the snode pool
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.dropSnode(snode, in: transaction)
}
}
LokiAPI.dropSnodeFromSwarmIfNeeded(snode, hexEncodedPublicKey: hexEncodedPublicKey)
LokiAPI.dropSnodeFromSnodePool(snode)
LokiAPI.snodeFailureCount[snode] = 0
}
case 406:

View File

@ -26,7 +26,7 @@ public extension OWSPrimaryStorage {
return result
}
public func dropSnode(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) {
public func dropSnodeFromSnodePool(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) {
transaction.removeObject(forKey: snode.description, inCollection: OWSPrimaryStorage.snodePoolCollection)
}
@ -63,7 +63,7 @@ public extension OWSPrimaryStorage {
// MARK: - Onion Request Path
// MARK: - Onion Request Paths
private static let onionRequestPathCollection = "LokiOnionRequestPathCollection"
public func setOnionRequestPaths(_ paths: [OnionRequestAPI.Path], in transaction: YapDatabaseReadWriteTransaction) {