From 40d199a90bb687f58dacb2438fa268a2a0c8c3a1 Mon Sep 17 00:00:00 2001 From: Niels Andriesse Date: Fri, 18 Dec 2020 15:17:23 +1100 Subject: [PATCH] Clean up SnodeAPI --- .../Sending & Receiving/Pollers/Poller.swift | 4 +- SessionSnodeKit/SnodeAPI.swift | 158 ++++++++++-------- 2 files changed, 92 insertions(+), 70 deletions(-) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index dc0a2610c..6c338c0c4 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -45,7 +45,7 @@ public final class Poller : NSObject { // MARK: Private API private func setUpPolling() { guard isPolling else { return } - let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey(), isForcedReload: true).then2 { [weak self] _ -> Promise in + let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then2 { [weak self] _ -> Promise in guard let strongSelf = self else { return Promise { $0.fulfill(()) } } strongSelf.usedSnodes.removeAll() let (promise, seal) = Promise.pending() @@ -109,7 +109,7 @@ public final class Poller : NSObject { if strongSelf.pollCount == Poller.maxPollCount { throw Error.pollLimitReached } else { - return withDelay(Poller.pollInterval, completionQueue: SnodeAPI.workQueue) { + return withDelay(Poller.pollInterval, completionQueue: DispatchQueue.main) { guard let strongSelf = self, strongSelf.isPolling else { return Promise { $0.fulfill(()) } } return strongSelf.poll(snode, seal: longTermSeal) } diff --git a/SessionSnodeKit/SnodeAPI.swift b/SessionSnodeKit/SnodeAPI.swift index f4527975b..eb0d8c312 100644 --- a/SessionSnodeKit/SnodeAPI.swift +++ b/SessionSnodeKit/SnodeAPI.swift @@ -3,7 +3,9 @@ import SessionUtilitiesKit @objc(SNSnodeAPI) public final class SnodeAPI : NSObject { - + private static var hasLoadedSnodePool = false + private static var loadedSwarms: Set = [] + /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. internal static var snodeFailureCount: [Snode:UInt] = [:] /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. @@ -11,12 +13,10 @@ public final class SnodeAPI : NSObject { /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. public static var swarmCache: [String:Set] = [:] - public static var workQueue: DispatchQueue { Threading.workQueue } // Just to make things fit with legacy code // MARK: Settings private static let maxRetryCount: UInt = 8 - private static let minimumSnodePoolCount = 64 - private static let minimumSwarmSnodeCount = 2 + private static let minimumSwarmSnodeCount = 3 private static let seedNodePool: Set = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ] private static let snodeFailureThreshold = 3 private static let targetSwarmSnodeCount = 2 @@ -30,13 +30,13 @@ public final class SnodeAPI : NSObject { public enum Error : LocalizedError { case generic case clockOutOfSync - case randomSnodePoolUpdatingFailed + case snodePoolUpdatingFailed public var errorDescription: String? { switch self { case .generic: return "An error occurred." case .clockOutOfSync: return "Your clock is out of sync with the service node network." - case .randomSnodePoolUpdatingFailed: return "Failed to update random service node pool." + case .snodePoolUpdatingFailed: return "Failed to update random service node pool." } } } @@ -46,8 +46,68 @@ public final class SnodeAPI : NSObject { public typealias RawResponse = Any public typealias RawResponsePromise = Promise + // MARK: Snode Pool Interaction + private static func loadSnodePoolIfNeeded() { + guard !hasLoadedSnodePool else { return } + snodePool = SNSnodeKitConfiguration.shared.storage.getSnodePool() + hasLoadedSnodePool = true + } + + private static func setSnodePool(to newValue: Set, persist: Bool = true) { + #if DEBUG + dispatchPrecondition(condition: .onQueue(Threading.workQueue)) + #endif + snodePool = newValue + guard persist else { return } + SNSnodeKitConfiguration.shared.storage.writeSync { transaction in + SNSnodeKitConfiguration.shared.storage.setSnodePool(to: newValue, using: transaction) + } + } + + private static func dropSnodeFromSnodePool(_ snode: Snode) { + #if DEBUG + dispatchPrecondition(condition: .onQueue(Threading.workQueue)) + #endif + var snodePool = SnodeAPI.snodePool + snodePool.remove(snode) + setSnodePool(to: snodePool) + } + + @objc public static func clearSnodePool() { + snodePool.removeAll() + setSnodePool(to: []) + } + + // MARK: Swarm Interaction + private static func loadSwarmIfNeeded(for publicKey: String) { + guard !loadedSwarms.contains(publicKey) else { return } + swarmCache[publicKey] = SNSnodeKitConfiguration.shared.storage.getSwarm(for: publicKey) + loadedSwarms.insert(publicKey) + } + + private static func setSwarm(to newValue: Set, for publicKey: String, persist: Bool = true) { + #if DEBUG + dispatchPrecondition(condition: .onQueue(Threading.workQueue)) + #endif + swarmCache[publicKey] = newValue + guard persist else { return } + SNSnodeKitConfiguration.shared.storage.writeSync { transaction in + SNSnodeKitConfiguration.shared.storage.setSwarm(to: newValue, for: publicKey, using: transaction) + } + } + + public static func dropSnodeFromSwarmIfNeeded(_ snode: Snode, publicKey: String) { + #if DEBUG + dispatchPrecondition(condition: .onQueue(Threading.workQueue)) + #endif + let swarmOrNil = swarmCache[publicKey] + guard var swarm = swarmOrNil, let index = swarm.firstIndex(of: snode) else { return } + swarm.remove(at: index) + setSwarm(to: swarm, for: publicKey) + } + // MARK: Internal API - public static func invoke(_ method: Snode.Method, on snode: Snode, associatedWith publicKey: String, parameters: JSON) -> RawResponsePromise { + internal static func invoke(_ method: Snode.Method, on snode: Snode, associatedWith publicKey: String, parameters: JSON) -> RawResponsePromise { if useOnionRequests { return OnionRequestAPI.sendOnionRequest(to: snode, invoking: method, with: parameters, associatedWith: publicKey).map2 { $0 as Any } } else { @@ -58,14 +118,12 @@ public final class SnodeAPI : NSObject { } } } - + internal static func getRandomSnode() -> Promise { - if snodePool.count < minimumSnodePoolCount { - snodePool = SNSnodeKitConfiguration.shared.storage.getSnodePool() - } + loadSnodePoolIfNeeded() let now = Date() let isSnodePoolExpired = given(Storage.shared.getLastSnodePoolRefreshDate()) { now.timeIntervalSince($0) > 24 * 60 * 60 } ?? true - let isRefreshNeeded = (snodePool.count < minimumSnodePoolCount) || isSnodePoolExpired + let isRefreshNeeded = (snodePool.isEmpty || isSnodePoolExpired) if isRefreshNeeded { SNSnodeKitConfiguration.shared.storage.write { transaction in Storage.shared.setLastSnodePoolRefreshDate(to: now, using: transaction) @@ -86,28 +144,26 @@ public final class SnodeAPI : NSObject { Threading.workQueue.async { attempt(maxRetryCount: 4, recoveringOn: Threading.workQueue) { HTTP.execute(.post, url, parameters: parameters, useSSLURLSession: true).map2 { json -> Snode in - guard let intermediate = json["result"] as? JSON, let rawSnodes = intermediate["service_node_states"] as? [JSON] else { throw Error.randomSnodePoolUpdatingFailed } - snodePool = Set(rawSnodes.compactMap { rawSnode in + guard let intermediate = json["result"] as? JSON, let rawSnodes = intermediate["service_node_states"] as? [JSON] else { throw Error.snodePoolUpdatingFailed } + let snodePool: Set = Set(rawSnodes.compactMap { rawSnode in guard let address = rawSnode["public_ip"] as? String, let port = rawSnode["storage_port"] as? Int, let ed25519PublicKey = rawSnode["pubkey_ed25519"] as? String, let x25519PublicKey = rawSnode["pubkey_x25519"] as? String, address != "0.0.0.0" else { - SNLog("Failed to parse target from: \(rawSnode).") + SNLog("Failed to parse snode from: \(rawSnode).") return nil } return Snode(address: "https://\(address)", port: UInt16(port), publicKeySet: Snode.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey)) }) + setSnodePool(to: snodePool) // randomElement() uses the system's default random generator, which is cryptographically secure if !snodePool.isEmpty { return snodePool.randomElement()! } else { - throw Error.randomSnodePoolUpdatingFailed + throw Error.snodePoolUpdatingFailed } } }.done2 { snode in + SNLog("Successfully updated snode pool.") seal.fulfill(snode) - SNSnodeKitConfiguration.shared.storage.writeSync { transaction in - SNLog("Persisting snode pool to database.") - SNSnodeKitConfiguration.shared.storage.setSnodePool(to: SnodeAPI.snodePool, using: transaction) - } }.catch2 { error in SNLog("Failed to contact seed node at: \(target).") seal.reject(error) @@ -122,50 +178,15 @@ public final class SnodeAPI : NSObject { } } - internal static func dropSnodeFromSnodePool(_ snode: Snode) { - #if DEBUG - dispatchPrecondition(condition: .onQueue(Threading.workQueue)) - #endif - var snodePool = SnodeAPI.snodePool - snodePool.remove(snode) - SnodeAPI.snodePool = snodePool - SNSnodeKitConfiguration.shared.storage.writeSync { transaction in - SNSnodeKitConfiguration.shared.storage.setSnodePool(to: snodePool, using: transaction) - } - } - // MARK: Public API - @objc public static func clearSnodePool() { - snodePool.removeAll() - SNSnodeKitConfiguration.shared.storage.writeSync { transaction in - SNSnodeKitConfiguration.shared.storage.setSnodePool(to: [], using: transaction) - } - } - - public static func dropSnodeFromSwarmIfNeeded(_ snode: Snode, publicKey: String) { - #if DEBUG - dispatchPrecondition(condition: .onQueue(Threading.workQueue)) - #endif - let swarm = SnodeAPI.swarmCache[publicKey] - if var swarm = swarm, let index = swarm.firstIndex(of: snode) { - swarm.remove(at: index) - SnodeAPI.swarmCache[publicKey] = swarm - SNSnodeKitConfiguration.shared.storage.writeSync { transaction in - SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction) - } - } - } - public static func getTargetSnodes(for publicKey: String) -> Promise<[Snode]> { // shuffled() uses the system's default random generator, which is cryptographically secure return getSwarm(for: publicKey).map2 { Array($0.shuffled().prefix(targetSwarmSnodeCount)) } } - public static func getSwarm(for publicKey: String, isForcedReload: Bool = false) -> Promise> { - if swarmCache[publicKey] == nil { - swarmCache[publicKey] = SNSnodeKitConfiguration.shared.storage.getSwarm(for: publicKey) - } - if let cachedSwarm = swarmCache[publicKey], cachedSwarm.count >= minimumSwarmSnodeCount && !isForcedReload { + public static func getSwarm(for publicKey: String) -> Promise> { + loadSwarmIfNeeded(for: publicKey) + if let cachedSwarm = swarmCache[publicKey], cachedSwarm.count >= minimumSwarmSnodeCount { return Promise> { $0.fulfill(cachedSwarm) } } else { SNLog("Getting swarm for: \((publicKey == SNSnodeKitConfiguration.shared.storage.getUserPublicKey()) ? "self" : publicKey).") @@ -176,23 +197,24 @@ public final class SnodeAPI : NSObject { } }.map2 { rawSnodes in let swarm = parseSnodes(from: rawSnodes) - swarmCache[publicKey] = swarm - SNSnodeKitConfiguration.shared.storage.writeSync { transaction in - SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction) - } + setSwarm(to: swarm, for: publicKey) return swarm } } } - + public static func getRawMessages(from snode: Snode, associatedWith publicKey: String) -> RawResponsePromise { + let (promise, seal) = RawResponsePromise.pending() let storage = SNSnodeKitConfiguration.shared.storage - storage.writeSync { transaction in - storage.pruneLastMessageHashInfoIfExpired(for: snode, associatedWith: publicKey, using: transaction) + Threading.workQueue.async { + storage.writeSync { transaction in + storage.pruneLastMessageHashInfoIfExpired(for: snode, associatedWith: publicKey, using: transaction) + } + let lastHash = storage.getLastMessageHash(for: snode, associatedWith: publicKey) ?? "" + let parameters = [ "pubKey" : publicKey, "lastHash" : lastHash ] + invoke(.getMessages, on: snode, associatedWith: publicKey, parameters: parameters).done2 { seal.fulfill($0) }.catch2 { seal.reject($0) } } - let lastHash = storage.getLastMessageHash(for: snode, associatedWith: publicKey) ?? "" - let parameters = [ "pubKey" : publicKey, "lastHash" : lastHash ] - return invoke(.getMessages, on: snode, associatedWith: publicKey, parameters: parameters) + return promise } public static func getMessages(for publicKey: String) -> Promise> {