Persist snode pool to database

This commit is contained in:
nielsandriesse 2020-05-28 11:38:44 +10:00
parent 0994169d09
commit 9778dace7d
5 changed files with 87 additions and 51 deletions

View File

@ -1519,7 +1519,7 @@ static NSTimeInterval launchStartedAt;
[ThreadUtil deleteAllContent];
[SSKEnvironment.shared.messageSenderJobQueue clearAllJobs];
[SSKEnvironment.shared.identityManager clearIdentityKey];
[LKAPI clearRandomSnodePool];
[LKAPI clearSnodePool];
[self stopPollerIfNeeded];
[self stopOpenGroupPollersIfNeeded];
[self.lokiNewsFeedPoller stop];

View File

@ -679,7 +679,7 @@ typedef NS_ENUM(NSInteger, HomeViewControllerSection) {
[alert addAction:[UIAlertAction actionWithTitle:NSLocalizedString(@"OK", @"") style:UIAlertActionStyleDefault handler:^(UIAlertAction *action) {
[ThreadUtil deleteAllContent];
[SSKEnvironment.shared.identityManager clearIdentityKey];
[LKAPI clearRandomSnodePool];
[LKAPI clearSnodePool];
AppDelegate *appDelegate = (AppDelegate *)UIApplication.sharedApplication.delegate;
[appDelegate stopPollerIfNeeded];
[appDelegate stopOpenGroupPollersIfNeeded];

View File

@ -26,15 +26,26 @@ public extension LokiAPI {
// MARK: Clearnet Setup
fileprivate static let seedNodePool: Set<String> = [ "https://storage.seed1.loki.network", "https://storage.seed3.loki.network", "https://public.loki.foundation" ]
internal static var randomSnodePool: Set<LokiAPITarget> = []
internal static var snodePool: Set<LokiAPITarget> = []
@objc public static func clearRandomSnodePool() {
randomSnodePool.removeAll()
@objc public static func clearSnodePool() {
snodePool.removeAll()
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.clearSnodePool(in: transaction)
}
}
}
// MARK: Internal API
internal static func getRandomSnode() -> Promise<LokiAPITarget> {
if randomSnodePool.isEmpty {
if snodePool.isEmpty {
storage.dbReadConnection.read { transaction in
snodePool = storage.getSnodePool(in: transaction)
}
}
if snodePool.isEmpty {
let target = seedNodePool.randomElement()!
let url = "\(target)/json_rpc"
let parameters: JSON = [
@ -42,19 +53,16 @@ public extension LokiAPI {
"params" : [
"active_only" : true,
"fields" : [
"public_ip" : true,
"storage_port" : true,
"pubkey_ed25519" : true,
"pubkey_x25519" : true
"public_ip" : true, "storage_port" : true, "pubkey_ed25519" : true, "pubkey_x25519" : true
]
]
]
print("[Loki] Populating snode pool using: \(target).")
let (promise, seal) = Promise<LokiAPITarget>.pending()
attempt(maxRetryCount: 4, recoveringOn: DispatchQueue.global()) {
HTTP.execute(.post, url, parameters: parameters).map(on: DispatchQueue.global()) { json in
HTTP.execute(.post, url, parameters: parameters).map(on: DispatchQueue.global()) { json -> LokiAPITarget in
guard let intermediate = json["result"] as? JSON, let rawTargets = intermediate["service_node_states"] as? [JSON] else { throw LokiAPIError.randomSnodePoolUpdatingFailed }
randomSnodePool = try Set(rawTargets.flatMap { rawTarget in
snodePool = try Set(rawTargets.flatMap { rawTarget in
guard let address = rawTarget["public_ip"] as? String, let port = rawTarget["storage_port"] as? Int, let ed25519PublicKey = rawTarget["pubkey_ed25519"] as? String, let x25519PublicKey = rawTarget["pubkey_x25519"] as? String, address != "0.0.0.0" else {
print("[Loki] Failed to parse target from: \(rawTarget).")
return nil
@ -62,10 +70,17 @@ public extension LokiAPI {
return LokiAPITarget(address: "https://\(address)", port: UInt16(port), publicKeySet: LokiAPITarget.KeySet(ed25519Key: ed25519PublicKey, x25519Key: x25519PublicKey))
})
// randomElement() uses the system's default random generator, which is cryptographically secure
return randomSnodePool.randomElement()!
return snodePool.randomElement()!
}
}.done(on: DispatchQueue.global()) { snode in
seal.fulfill(snode)
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
print("[Loki] Persisting snode pool to database.")
storage.setSnodePool(LokiAPI.snodePool, in: transaction)
}
}
}.catch(on: DispatchQueue.global()) { error in
print("[Loki] Failed to contact seed node at: \(target).")
seal.reject(error)
@ -74,7 +89,7 @@ public extension LokiAPI {
} else {
return Promise<LokiAPITarget> { seal in
// randomElement() uses the system's default random generator, which is cryptographically secure
seal.fulfill(randomSnodePool.randomElement()!)
seal.fulfill(snodePool.randomElement()!)
}
}
}
@ -161,7 +176,14 @@ internal extension Promise {
if newFailureCount >= LokiAPI.failureThreshold {
print("[Loki] Failure threshold reached for: \(target); dropping it.")
LokiAPI.dropIfNeeded(target, hexEncodedPublicKey: hexEncodedPublicKey) // Remove it from the swarm cache associated with the given public key
LokiAPI.randomSnodePool.remove(target) // Remove it from the random snode pool
LokiAPI.snodePool.remove(target) // Remove it from the snode pool
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.dropSnode(target, in: transaction)
}
}
LokiAPI.failureCount[target] = 0
}
case 406:

View File

@ -10,7 +10,7 @@ public enum OnionRequestAPI {
private static var snodePool: Set<LokiAPITarget> {
let unreliableSnodes = Set(LokiAPI.failureCount.keys)
return LokiAPI.randomSnodePool.subtracting(unreliableSnodes)
return LokiAPI.snodePool.subtracting(unreliableSnodes)
}
// MARK: Settings
@ -257,7 +257,14 @@ private extension Promise where T == JSON {
if newFailureCount >= LokiAPI.failureThreshold {
print("[Loki] Failure threshold reached for: \(snode); dropping it.")
LokiAPI.dropIfNeeded(snode, hexEncodedPublicKey: hexEncodedPublicKey) // Remove it from the swarm cache associated with the given public key
LokiAPI.randomSnodePool.remove(snode) // Remove it from the random snode pool
LokiAPI.snodePool.remove(snode) // Remove it from the snode pool
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.clearSnodePool(in: transaction)
}
}
LokiAPI.failureCount[snode] = 0
}
case 406:

View File

@ -1,7 +1,36 @@
public extension OWSPrimaryStorage {
// MARK: Session Requests
// MARK: - Snode Pool
private static let snodePoolCollection = "LokiSnodePoolCollection"
public func setSnodePool(_ snodePool: Set<LokiAPITarget>, in transaction: YapDatabaseReadWriteTransaction) {
clearSnodePool(in: transaction)
snodePool.forEach { snode in
transaction.setObject(snode, forKey: snode.description, inCollection: OWSPrimaryStorage.snodePoolCollection)
}
}
public func clearSnodePool(in transaction: YapDatabaseReadWriteTransaction) {
transaction.removeAllObjects(inCollection: OWSPrimaryStorage.snodePoolCollection)
}
public func getSnodePool(in transaction: YapDatabaseReadTransaction) -> Set<LokiAPITarget> {
var result: Set<LokiAPITarget> = []
transaction.enumerateKeysAndObjects(inCollection: OWSPrimaryStorage.snodePoolCollection) { _, object, _ in
guard let snode = object as? LokiAPITarget else { return }
result.insert(snode)
}
return result
}
public func dropSnode(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) {
transaction.removeObject(forKey: snode.description, inCollection: OWSPrimaryStorage.snodePoolCollection)
}
// MARK: - Session Requests
private static let sessionRequestTimestampCollection = "LokiSessionRequestTimestampCollection"
public func setSessionRequestTimestamp(for publicKey: String, to timestamp: Date, in transaction: YapDatabaseReadWriteTransaction) {
@ -12,7 +41,9 @@ public extension OWSPrimaryStorage {
transaction.date(forKey: publicKey, inCollection: OWSPrimaryStorage.sessionRequestTimestampCollection)
}
// MARK: Multi Device
// MARK: - Multi Device
private static var deviceLinkCache: Set<DeviceLink> = []
private func getDeviceLinkCollection(for masterHexEncodedPublicKey: String) -> String {
@ -24,63 +55,39 @@ public extension OWSPrimaryStorage {
}
public func setDeviceLinks(_ deviceLinks: Set<DeviceLink>, in transaction: YapDatabaseReadWriteTransaction) {
// TODO: Clear collections first?
deviceLinks.forEach { addDeviceLink($0, in: transaction) } // TODO: Check the performance impact of this
deviceLinks.forEach { addDeviceLink($0, in: transaction) }
}
public func addDeviceLink(_ deviceLink: DeviceLink, in transaction: YapDatabaseReadWriteTransaction) {
OWSPrimaryStorage.deviceLinkCache.insert(deviceLink)
/*
let collection = getDeviceLinkCollection(for: deviceLink.master.hexEncodedPublicKey)
transaction.setObject(deviceLink, forKey: deviceLink.slave.hexEncodedPublicKey, inCollection: collection)
*/
}
public func removeDeviceLink(_ deviceLink: DeviceLink, in transaction: YapDatabaseReadWriteTransaction) {
OWSPrimaryStorage.deviceLinkCache.remove(deviceLink)
/*
let collection = getDeviceLinkCollection(for: deviceLink.master.hexEncodedPublicKey)
transaction.removeObject(forKey: deviceLink.slave.hexEncodedPublicKey, inCollection: collection)
*/
}
public func getDeviceLinks(for masterHexEncodedPublicKey: String, in transaction: YapDatabaseReadTransaction) -> Set<DeviceLink> {
return OWSPrimaryStorage.deviceLinkCache.filter { $0.master.hexEncodedPublicKey == masterHexEncodedPublicKey }
/*
let collection = getDeviceLinkCollection(for: masterHexEncodedPublicKey)
guard !transaction.allKeys(inCollection: collection).isEmpty else { return [] } // Fixes a crash that used to occur on Josh's device
var result: Set<DeviceLink> = []
transaction.enumerateRows(inCollection: collection) { _, object, _, _ in
guard let deviceLink = object as? DeviceLink else { return }
result.insert(deviceLink)
}
return result
*/
}
public func getDeviceLink(for slaveHexEncodedPublicKey: String, in transaction: YapDatabaseReadTransaction) -> DeviceLink? {
return OWSPrimaryStorage.deviceLinkCache.filter { $0.slave.hexEncodedPublicKey == slaveHexEncodedPublicKey }.first
/*
let query = YapDatabaseQuery(string: "WHERE \(DeviceLinkIndex.slaveHexEncodedPublicKey) = ?", parameters: [ slaveHexEncodedPublicKey ])
let deviceLinks = DeviceLinkIndex.getDeviceLinks(for: query, in: transaction)
guard deviceLinks.count <= 1 else {
print("[Loki] Found multiple device links for slave hex encoded public key: \(slaveHexEncodedPublicKey).")
return nil
}
return deviceLinks.first
*/
}
public func getMasterHexEncodedPublicKey(for slaveHexEncodedPublicKey: String, in transaction: YapDatabaseReadTransaction) -> String? {
return getDeviceLink(for: slaveHexEncodedPublicKey, in: transaction)?.master.hexEncodedPublicKey
}
// MARK: Open Groups
// MARK: - Open Groups
private static let openGroupUserCountCollection = "LokiPublicChatUserCountCollection"
public func getUserCount(for publicChat: LokiPublicChat, in transaction: YapDatabaseReadTransaction) -> Int? {
return transaction.object(forKey: publicChat.id, inCollection: "LokiPublicChatUserCountCollection") as? Int
return transaction.object(forKey: publicChat.id, inCollection: OWSPrimaryStorage.openGroupUserCountCollection) as? Int
}
public func setUserCount(_ userCount: Int, forPublicChatWithID publicChatID: String, in transaction: YapDatabaseReadWriteTransaction) {
transaction.setObject(userCount, forKey: publicChatID, inCollection: "LokiPublicChatUserCountCollection")
transaction.setObject(userCount, forKey: publicChatID, inCollection: OWSPrimaryStorage.openGroupUserCountCollection)
}
}