Clean up SnodeAPI
This commit is contained in:
parent
15c0fd9414
commit
40d199a90b
|
@ -45,7 +45,7 @@ public final class Poller : NSObject {
|
|||
// MARK: Private API
|
||||
private func setUpPolling() {
|
||||
guard isPolling else { return }
|
||||
let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey(), isForcedReload: true).then2 { [weak self] _ -> Promise<Void> in
|
||||
let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then2 { [weak self] _ -> Promise<Void> in
|
||||
guard let strongSelf = self else { return Promise { $0.fulfill(()) } }
|
||||
strongSelf.usedSnodes.removeAll()
|
||||
let (promise, seal) = Promise<Void>.pending()
|
||||
|
@ -109,7 +109,7 @@ public final class Poller : NSObject {
|
|||
if strongSelf.pollCount == Poller.maxPollCount {
|
||||
throw Error.pollLimitReached
|
||||
} else {
|
||||
return withDelay(Poller.pollInterval, completionQueue: SnodeAPI.workQueue) {
|
||||
return withDelay(Poller.pollInterval, completionQueue: DispatchQueue.main) {
|
||||
guard let strongSelf = self, strongSelf.isPolling else { return Promise { $0.fulfill(()) } }
|
||||
return strongSelf.poll(snode, seal: longTermSeal)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,9 @@ import SessionUtilitiesKit
|
|||
|
||||
@objc(SNSnodeAPI)
|
||||
public final class SnodeAPI : NSObject {
|
||||
|
||||
private static var hasLoadedSnodePool = false
|
||||
private static var loadedSwarms: Set<String> = []
|
||||
|
||||
/// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions.
|
||||
internal static var snodeFailureCount: [Snode:UInt] = [:]
|
||||
/// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions.
|
||||
|
@ -11,12 +13,10 @@ public final class SnodeAPI : NSObject {
|
|||
|
||||
/// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions.
|
||||
public static var swarmCache: [String:Set<Snode>] = [:]
|
||||
public static var workQueue: DispatchQueue { Threading.workQueue } // Just to make things fit with legacy code
|
||||
|
||||
// MARK: Settings
|
||||
private static let maxRetryCount: UInt = 8
|
||||
private static let minimumSnodePoolCount = 64
|
||||
private static let minimumSwarmSnodeCount = 2
|
||||
private static let minimumSwarmSnodeCount = 3
|
||||
private static let seedNodePool: Set<String> = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ]
|
||||
private static let snodeFailureThreshold = 3
|
||||
private static let targetSwarmSnodeCount = 2
|
||||
|
@ -30,13 +30,13 @@ public final class SnodeAPI : NSObject {
|
|||
public enum Error : LocalizedError {
|
||||
case generic
|
||||
case clockOutOfSync
|
||||
case randomSnodePoolUpdatingFailed
|
||||
case snodePoolUpdatingFailed
|
||||
|
||||
public var errorDescription: String? {
|
||||
switch self {
|
||||
case .generic: return "An error occurred."
|
||||
case .clockOutOfSync: return "Your clock is out of sync with the service node network."
|
||||
case .randomSnodePoolUpdatingFailed: return "Failed to update random service node pool."
|
||||
case .snodePoolUpdatingFailed: return "Failed to update random service node pool."
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -46,8 +46,68 @@ public final class SnodeAPI : NSObject {
|
|||
public typealias RawResponse = Any
|
||||
public typealias RawResponsePromise = Promise<RawResponse>
|
||||
|
||||
// MARK: Snode Pool Interaction
|
||||
private static func loadSnodePoolIfNeeded() {
|
||||
guard !hasLoadedSnodePool else { return }
|
||||
snodePool = SNSnodeKitConfiguration.shared.storage.getSnodePool()
|
||||
hasLoadedSnodePool = true
|
||||
}
|
||||
|
||||
private static func setSnodePool(to newValue: Set<Snode>, persist: Bool = true) {
|
||||
#if DEBUG
|
||||
dispatchPrecondition(condition: .onQueue(Threading.workQueue))
|
||||
#endif
|
||||
snodePool = newValue
|
||||
guard persist else { return }
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: newValue, using: transaction)
|
||||
}
|
||||
}
|
||||
|
||||
private static func dropSnodeFromSnodePool(_ snode: Snode) {
|
||||
#if DEBUG
|
||||
dispatchPrecondition(condition: .onQueue(Threading.workQueue))
|
||||
#endif
|
||||
var snodePool = SnodeAPI.snodePool
|
||||
snodePool.remove(snode)
|
||||
setSnodePool(to: snodePool)
|
||||
}
|
||||
|
||||
@objc public static func clearSnodePool() {
|
||||
snodePool.removeAll()
|
||||
setSnodePool(to: [])
|
||||
}
|
||||
|
||||
// MARK: Swarm Interaction
|
||||
private static func loadSwarmIfNeeded(for publicKey: String) {
|
||||
guard !loadedSwarms.contains(publicKey) else { return }
|
||||
swarmCache[publicKey] = SNSnodeKitConfiguration.shared.storage.getSwarm(for: publicKey)
|
||||
loadedSwarms.insert(publicKey)
|
||||
}
|
||||
|
||||
private static func setSwarm(to newValue: Set<Snode>, for publicKey: String, persist: Bool = true) {
|
||||
#if DEBUG
|
||||
dispatchPrecondition(condition: .onQueue(Threading.workQueue))
|
||||
#endif
|
||||
swarmCache[publicKey] = newValue
|
||||
guard persist else { return }
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNSnodeKitConfiguration.shared.storage.setSwarm(to: newValue, for: publicKey, using: transaction)
|
||||
}
|
||||
}
|
||||
|
||||
public static func dropSnodeFromSwarmIfNeeded(_ snode: Snode, publicKey: String) {
|
||||
#if DEBUG
|
||||
dispatchPrecondition(condition: .onQueue(Threading.workQueue))
|
||||
#endif
|
||||
let swarmOrNil = swarmCache[publicKey]
|
||||
guard var swarm = swarmOrNil, let index = swarm.firstIndex(of: snode) else { return }
|
||||
swarm.remove(at: index)
|
||||
setSwarm(to: swarm, for: publicKey)
|
||||
}
|
||||
|
||||
// MARK: Internal API
|
||||
public static func invoke(_ method: Snode.Method, on snode: Snode, associatedWith publicKey: String, parameters: JSON) -> RawResponsePromise {
|
||||
internal static func invoke(_ method: Snode.Method, on snode: Snode, associatedWith publicKey: String, parameters: JSON) -> RawResponsePromise {
|
||||
if useOnionRequests {
|
||||
return OnionRequestAPI.sendOnionRequest(to: snode, invoking: method, with: parameters, associatedWith: publicKey).map2 { $0 as Any }
|
||||
} else {
|
||||
|
@ -58,14 +118,12 @@ public final class SnodeAPI : NSObject {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
internal static func getRandomSnode() -> Promise<Snode> {
|
||||
if snodePool.count < minimumSnodePoolCount {
|
||||
snodePool = SNSnodeKitConfiguration.shared.storage.getSnodePool()
|
||||
}
|
||||
loadSnodePoolIfNeeded()
|
||||
let now = Date()
|
||||
let isSnodePoolExpired = given(Storage.shared.getLastSnodePoolRefreshDate()) { now.timeIntervalSince($0) > 24 * 60 * 60 } ?? true
|
||||
let isRefreshNeeded = (snodePool.count < minimumSnodePoolCount) || isSnodePoolExpired
|
||||
let isRefreshNeeded = (snodePool.isEmpty || isSnodePoolExpired)
|
||||
if isRefreshNeeded {
|
||||
SNSnodeKitConfiguration.shared.storage.write { transaction in
|
||||
Storage.shared.setLastSnodePoolRefreshDate(to: now, using: transaction)
|
||||
|
@ -86,28 +144,26 @@ public final class SnodeAPI : NSObject {
|
|||
Threading.workQueue.async {
|
||||
attempt(maxRetryCount: 4, recoveringOn: Threading.workQueue) {
|
||||
HTTP.execute(.post, url, parameters: parameters, useSSLURLSession: true).map2 { json -> Snode in
|
||||
guard let intermediate = json["result"] as? JSON, let rawSnodes = intermediate["service_node_states"] as? [JSON] else { throw Error.randomSnodePoolUpdatingFailed }
|
||||
snodePool = Set(rawSnodes.compactMap { rawSnode in
|
||||
guard let intermediate = json["result"] as? JSON, let rawSnodes = intermediate["service_node_states"] as? [JSON] else { throw Error.snodePoolUpdatingFailed }
|
||||
let snodePool: Set<Snode> = Set(rawSnodes.compactMap { rawSnode in
|
||||
guard let address = rawSnode["public_ip"] as? String, let port = rawSnode["storage_port"] as? Int,
|
||||
let ed25519PublicKey = rawSnode["pubkey_ed25519"] as? String, let x25519PublicKey = rawSnode["pubkey_x25519"] as? String, address != "0.0.0.0" else {
|
||||
SNLog("Failed to parse target from: \(rawSnode).")
|
||||
SNLog("Failed to parse snode from: \(rawSnode).")
|
||||
return nil
|
||||
}
|
||||
return Snode(address: "https://\(address)", port: UInt16(port), publicKeySet: Snode.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey))
|
||||
})
|
||||
setSnodePool(to: snodePool)
|
||||
// randomElement() uses the system's default random generator, which is cryptographically secure
|
||||
if !snodePool.isEmpty {
|
||||
return snodePool.randomElement()!
|
||||
} else {
|
||||
throw Error.randomSnodePoolUpdatingFailed
|
||||
throw Error.snodePoolUpdatingFailed
|
||||
}
|
||||
}
|
||||
}.done2 { snode in
|
||||
SNLog("Successfully updated snode pool.")
|
||||
seal.fulfill(snode)
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNLog("Persisting snode pool to database.")
|
||||
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: SnodeAPI.snodePool, using: transaction)
|
||||
}
|
||||
}.catch2 { error in
|
||||
SNLog("Failed to contact seed node at: \(target).")
|
||||
seal.reject(error)
|
||||
|
@ -122,50 +178,15 @@ public final class SnodeAPI : NSObject {
|
|||
}
|
||||
}
|
||||
|
||||
internal static func dropSnodeFromSnodePool(_ snode: Snode) {
|
||||
#if DEBUG
|
||||
dispatchPrecondition(condition: .onQueue(Threading.workQueue))
|
||||
#endif
|
||||
var snodePool = SnodeAPI.snodePool
|
||||
snodePool.remove(snode)
|
||||
SnodeAPI.snodePool = snodePool
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: snodePool, using: transaction)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Public API
|
||||
@objc public static func clearSnodePool() {
|
||||
snodePool.removeAll()
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNSnodeKitConfiguration.shared.storage.setSnodePool(to: [], using: transaction)
|
||||
}
|
||||
}
|
||||
|
||||
public static func dropSnodeFromSwarmIfNeeded(_ snode: Snode, publicKey: String) {
|
||||
#if DEBUG
|
||||
dispatchPrecondition(condition: .onQueue(Threading.workQueue))
|
||||
#endif
|
||||
let swarm = SnodeAPI.swarmCache[publicKey]
|
||||
if var swarm = swarm, let index = swarm.firstIndex(of: snode) {
|
||||
swarm.remove(at: index)
|
||||
SnodeAPI.swarmCache[publicKey] = swarm
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static func getTargetSnodes(for publicKey: String) -> Promise<[Snode]> {
|
||||
// shuffled() uses the system's default random generator, which is cryptographically secure
|
||||
return getSwarm(for: publicKey).map2 { Array($0.shuffled().prefix(targetSwarmSnodeCount)) }
|
||||
}
|
||||
|
||||
public static func getSwarm(for publicKey: String, isForcedReload: Bool = false) -> Promise<Set<Snode>> {
|
||||
if swarmCache[publicKey] == nil {
|
||||
swarmCache[publicKey] = SNSnodeKitConfiguration.shared.storage.getSwarm(for: publicKey)
|
||||
}
|
||||
if let cachedSwarm = swarmCache[publicKey], cachedSwarm.count >= minimumSwarmSnodeCount && !isForcedReload {
|
||||
public static func getSwarm(for publicKey: String) -> Promise<Set<Snode>> {
|
||||
loadSwarmIfNeeded(for: publicKey)
|
||||
if let cachedSwarm = swarmCache[publicKey], cachedSwarm.count >= minimumSwarmSnodeCount {
|
||||
return Promise<Set<Snode>> { $0.fulfill(cachedSwarm) }
|
||||
} else {
|
||||
SNLog("Getting swarm for: \((publicKey == SNSnodeKitConfiguration.shared.storage.getUserPublicKey()) ? "self" : publicKey).")
|
||||
|
@ -176,23 +197,24 @@ public final class SnodeAPI : NSObject {
|
|||
}
|
||||
}.map2 { rawSnodes in
|
||||
let swarm = parseSnodes(from: rawSnodes)
|
||||
swarmCache[publicKey] = swarm
|
||||
SNSnodeKitConfiguration.shared.storage.writeSync { transaction in
|
||||
SNSnodeKitConfiguration.shared.storage.setSwarm(to: swarm, for: publicKey, using: transaction)
|
||||
}
|
||||
setSwarm(to: swarm, for: publicKey)
|
||||
return swarm
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static func getRawMessages(from snode: Snode, associatedWith publicKey: String) -> RawResponsePromise {
|
||||
let (promise, seal) = RawResponsePromise.pending()
|
||||
let storage = SNSnodeKitConfiguration.shared.storage
|
||||
storage.writeSync { transaction in
|
||||
storage.pruneLastMessageHashInfoIfExpired(for: snode, associatedWith: publicKey, using: transaction)
|
||||
Threading.workQueue.async {
|
||||
storage.writeSync { transaction in
|
||||
storage.pruneLastMessageHashInfoIfExpired(for: snode, associatedWith: publicKey, using: transaction)
|
||||
}
|
||||
let lastHash = storage.getLastMessageHash(for: snode, associatedWith: publicKey) ?? ""
|
||||
let parameters = [ "pubKey" : publicKey, "lastHash" : lastHash ]
|
||||
invoke(.getMessages, on: snode, associatedWith: publicKey, parameters: parameters).done2 { seal.fulfill($0) }.catch2 { seal.reject($0) }
|
||||
}
|
||||
let lastHash = storage.getLastMessageHash(for: snode, associatedWith: publicKey) ?? ""
|
||||
let parameters = [ "pubKey" : publicKey, "lastHash" : lastHash ]
|
||||
return invoke(.getMessages, on: snode, associatedWith: publicKey, parameters: parameters)
|
||||
return promise
|
||||
}
|
||||
|
||||
public static func getMessages(for publicKey: String) -> Promise<Set<MessageListPromise>> {
|
||||
|
|
Loading…
Reference in New Issue