// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Foundation import Combine import Sodium import GRDB import SessionUtilitiesKit public final class SnodeAPI { internal static let sodium: Atomic = Atomic(Sodium()) private static var hasLoadedSnodePool: Atomic = Atomic(false) private static var loadedSwarms: Atomic> = Atomic([]) private static var getSnodePoolPublisher: Atomic, Error>?> = Atomic(nil) /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. internal static var snodeFailureCount: Atomic<[Snode: UInt]> = Atomic([:]) /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. internal static var snodePool: Atomic> = Atomic([]) /// The offset between the user's clock and the Service Node's clock. Used in cases where the /// user's clock is incorrect. /// /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. public static var clockOffset: Atomic = Atomic(0) /// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions. public static var swarmCache: Atomic<[String: Set]> = Atomic([:]) // MARK: - Hardfork version public static var hardfork = UserDefaults.standard[.hardfork] public static var softfork = UserDefaults.standard[.softfork] // MARK: - Settings private static let maxRetryCount: Int = 8 private static let minSwarmSnodeCount: Int = 3 private static let seedNodePool: Set = { guard !Features.useTestnet else { return [ "http://public.loki.foundation:38157" ] } return [ "https://storage.seed1.loki.network:4433", "https://storage.seed3.loki.network:4433", "https://public.loki.foundation:4433" ] }() private static let snodeFailureThreshold: Int = 3 private static let targetSwarmSnodeCount: Int = 2 private static let minSnodePoolCount: Int = 12 private static func offsetTimestampMsNow() -> UInt64 { return UInt64( Int64(floor(Date().timeIntervalSince1970 * 1000)) + SnodeAPI.clockOffset.wrappedValue ) } // MARK: Snode Pool Interaction private static var hasInsufficientSnodes: Bool { snodePool.wrappedValue.count < minSnodePoolCount } private static func loadSnodePoolIfNeeded() { guard !hasLoadedSnodePool.wrappedValue else { return } let fetchedSnodePool: Set = Storage.shared .read { db in try Snode.fetchSet(db) } .defaulting(to: []) snodePool.mutate { $0 = fetchedSnodePool } hasLoadedSnodePool.mutate { $0 = true } } private static func setSnodePool(to newValue: Set, db: Database? = nil) { snodePool.mutate { $0 = newValue } if let db: Database = db { _ = try? Snode.deleteAll(db) newValue.forEach { try? $0.save(db) } } else { Storage.shared.write { db in _ = try? Snode.deleteAll(db) newValue.forEach { try? $0.save(db) } } } } private static func dropSnodeFromSnodePool(_ snode: Snode) { #if DEBUG dispatchPrecondition(condition: .onQueue(Threading.workQueue)) #endif var snodePool = SnodeAPI.snodePool.wrappedValue snodePool.remove(snode) setSnodePool(to: snodePool) } @objc public static func clearSnodePool() { snodePool.mutate { $0.removeAll() } Threading.workQueue.async { setSnodePool(to: []) } } // MARK: - Swarm Interaction private static func loadSwarmIfNeeded(for publicKey: String) { guard !loadedSwarms.wrappedValue.contains(publicKey) else { return } let updatedCacheForKey: Set = Storage.shared .read { db in try Snode.fetchSet(db, publicKey: publicKey) } .defaulting(to: []) swarmCache.mutate { $0[publicKey] = updatedCacheForKey } loadedSwarms.mutate { $0.insert(publicKey) } } private static func setSwarm(to newValue: Set, for publicKey: String, persist: Bool = true) { swarmCache.mutate { $0[publicKey] = newValue } guard persist else { return } Storage.shared.write { db in try? newValue.save(db, key: publicKey) } } public static func dropSnodeFromSwarmIfNeeded(_ snode: Snode, publicKey: String) { let swarmOrNil = swarmCache.wrappedValue[publicKey] guard var swarm = swarmOrNil, let index = swarm.firstIndex(of: snode) else { return } swarm.remove(at: index) setSwarm(to: swarm, for: publicKey) } // MARK: - Public API public static func hasCachedSnodesInclusingExpired() -> Bool { loadSnodePoolIfNeeded() return !hasInsufficientSnodes } public static func getSnodePool() -> AnyPublisher, Error> { loadSnodePoolIfNeeded() let now: Date = Date() let hasSnodePoolExpired: Bool = given(Storage.shared[.lastSnodePoolRefreshDate]) { now.timeIntervalSince($0) > 2 * 60 * 60 }.defaulting(to: true) let snodePool: Set = SnodeAPI.snodePool.wrappedValue guard hasInsufficientSnodes || hasSnodePoolExpired else { return Just(snodePool) .setFailureType(to: Error.self) .eraseToAnyPublisher() } if let getSnodePoolPublisher: AnyPublisher, Error> = getSnodePoolPublisher.wrappedValue { return getSnodePoolPublisher } let publisher: AnyPublisher, Error> if snodePool.count < minSnodePoolCount { publisher = getSnodePoolFromSeedNode() } else { publisher = getSnodePoolFromSnode() .catch { _ in getSnodePoolFromSeedNode() } .eraseToAnyPublisher() } getSnodePoolPublisher.mutate { $0 = publisher } return publisher .flatMap { snodePool -> AnyPublisher, Error> in guard !snodePool.isEmpty else { return Fail(error: SnodeAPIError.snodePoolUpdatingFailed) .eraseToAnyPublisher() } return Storage.shared .writePublisher { db in db[.lastSnodePoolRefreshDate] = now setSnodePool(to: snodePool, db: db) return snodePool } .eraseToAnyPublisher() } .handleEvents( receiveCompletion: { _ in getSnodePoolPublisher.mutate { $0 = nil } } ) .eraseToAnyPublisher() } public static func getSessionID(for onsName: String) -> AnyPublisher { let validationCount = 3 // The name must be lowercased let onsName = onsName.lowercased() // Hash the ONS name using BLAKE2b let nameAsData = [UInt8](onsName.data(using: String.Encoding.utf8)!) guard let nameHash = sodium.wrappedValue.genericHash.hash(message: nameAsData) else { return Fail(error: SnodeAPIError.hashingFailed) .eraseToAnyPublisher() } // Ask 3 different snodes for the Session ID associated with the given name hash let base64EncodedNameHash = nameHash.toBase64() return Publishers .MergeMany( (0.. AnyPublisher in SnodeAPI .send( request: SnodeRequest( endpoint: .oxenDaemonRPCCall, body: OxenDaemonRPCRequest( endpoint: .daemonOnsResolve, body: ONSResolveRequest( type: 0, // type 0 means Session base64EncodedNameHash: base64EncodedNameHash ) ) ), to: snode, associatedWith: nil ) .decoded(as: ONSResolveResponse.self) .flatMap { _, response -> AnyPublisher in do { let result: String = try response.sessionId( sodium: sodium.wrappedValue, nameBytes: nameAsData, nameHashBytes: nameHash ) return Just(result) .setFailureType(to: Error.self) .eraseToAnyPublisher() } catch { return Fail(error: error) .eraseToAnyPublisher() } } .retry(4) .eraseToAnyPublisher() } } ) .subscribe(on: Threading.workQueue) .collect() .mapError { _ in SnodeAPIError.validationFailed } .flatMap { results -> AnyPublisher in guard results.count == validationCount, Set(results).count == 1 else { return Fail(error: SnodeAPIError.validationFailed) .eraseToAnyPublisher() } return Just(results[0]) .setFailureType(to: Error.self) .eraseToAnyPublisher() } .eraseToAnyPublisher() } public static func getTargetSnodes(for publicKey: String) -> AnyPublisher<[Snode], Error> { // shuffled() uses the system's default random generator, which is cryptographically secure return getSwarm(for: publicKey) .map { Array($0.shuffled().prefix(targetSwarmSnodeCount)) } .eraseToAnyPublisher() } public static func getSwarm( for publicKey: String, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher, Error> { loadSwarmIfNeeded(for: publicKey) if let cachedSwarm = swarmCache.wrappedValue[publicKey], cachedSwarm.count >= minSwarmSnodeCount { return Just(cachedSwarm) .setFailureType(to: Error.self) .eraseToAnyPublisher() } SNLog("Getting swarm for: \((publicKey == getUserHexEncodedPublicKey()) ? "self" : publicKey).") let targetPublicKey: String = (Features.useTestnet ? publicKey.removingIdPrefixIfNeeded() : publicKey ) return getRandomSnode() .flatMap { snode in SnodeAPI.send( request: SnodeRequest( endpoint: .getSwarm, body: GetSwarmRequest(pubkey: targetPublicKey) ), to: snode, associatedWith: publicKey, using: dependencies ) .retry(4) .eraseToAnyPublisher() } .map { _, responseData in parseSnodes(from: responseData) } .handleEvents( receiveOutput: { swarm in setSwarm(to: swarm, for: publicKey) } ) .eraseToAnyPublisher() } // MARK: - Retrieve public static func getMessages( in namespaces: [SnodeAPI.Namespace], from snode: Snode, associatedWith publicKey: String, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<[SnodeAPI.Namespace: (info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?)], Error> { let targetPublicKey: String = (Features.useTestnet ? publicKey.removingIdPrefixIfNeeded() : publicKey ) var userED25519KeyPair: Box.KeyPair? return Just(()) .setFailureType(to: Error.self) .flatMap { _ -> Future<[SnodeAPI.Namespace: String], Error> in Future<[SnodeAPI.Namespace: String], Error> { resolver in let namespaceLastHash: [SnodeAPI.Namespace: String] = namespaces .reduce(into: [:]) { result, namespace in // Prune expired message hashes for this namespace on this service node SnodeReceivedMessageInfo.pruneExpiredMessageHashInfo( for: snode, namespace: namespace, associatedWith: publicKey ) let maybeLastHash: String? = SnodeReceivedMessageInfo .fetchLastNotExpired( for: snode, namespace: namespace, associatedWith: publicKey )? .hash guard let lastHash: String = maybeLastHash else { return } result[namespace] = lastHash } resolver(Result.success(namespaceLastHash)) } } .flatMap { namespaceLastHash -> AnyPublisher<[SnodeAPI.Namespace: (info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?)], Error> in let requests: [SnodeAPI.BatchRequest.Info] do { requests = try namespaces .map { namespace -> SnodeAPI.BatchRequest.Info in // Check if this namespace requires authentication guard namespace.requiresReadAuthentication else { return BatchRequest.Info( request: SnodeRequest( endpoint: .getMessages, body: LegacyGetMessagesRequest( pubkey: targetPublicKey, lastHash: (namespaceLastHash[namespace] ?? ""), namespace: namespace ) ), responseType: GetMessagesResponse.self ) } // Generate the signature guard let keyPair: Box.KeyPair = (userED25519KeyPair ?? Storage.shared.read { db in Identity.fetchUserEd25519KeyPair(db) }) else { throw SnodeAPIError.signingFailed } userED25519KeyPair = keyPair return BatchRequest.Info( request: SnodeRequest( endpoint: .getMessages, body: GetMessagesRequest( lastHash: (namespaceLastHash[namespace] ?? ""), namespace: namespace, pubkey: targetPublicKey, subkey: nil, timestampMs: SnodeAPI.offsetTimestampMsNow(), ed25519PublicKey: keyPair.publicKey, ed25519SecretKey: keyPair.secretKey ) ), responseType: GetMessagesResponse.self ) } } catch { return Fail(error: error) .eraseToAnyPublisher() } let responseTypes = requests.map { $0.responseType } return SnodeAPI .send( request: SnodeRequest( endpoint: .batch, body: BatchRequest(requests: requests) ), to: snode, associatedWith: publicKey, using: dependencies ) .decoded(as: responseTypes, using: dependencies) .map { batchResponse -> [SnodeAPI.Namespace: (info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?)] in zip(namespaces, batchResponse) .reduce(into: [:]) { result, next in guard let messageResponse: GetMessagesResponse = (next.1.1 as? HTTP.BatchSubResponse)?.body else { return } let namespace: SnodeAPI.Namespace = next.0 let requestInfo: ResponseInfoType = next.1.0 result[namespace] = ( requestInfo, ( messageResponse.messages .compactMap { rawMessage -> SnodeReceivedMessage? in SnodeReceivedMessage( snode: snode, publicKey: publicKey, namespace: namespace, rawMessage: rawMessage ) }, namespaceLastHash[namespace] ) ) } } .eraseToAnyPublisher() } .eraseToAnyPublisher() } // MARK: Store public static func sendMessage( _ message: SnodeMessage, in namespace: Namespace, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<(Result<(ResponseInfoType, SendMessagesResponse), Error>, Int), Error> { let publicKey: String = (Features.useTestnet ? message.recipient.removingIdPrefixIfNeeded() : message.recipient ) // Create a convenience method to send a message to an individual Snode func sendMessage(to snode: Snode) -> AnyPublisher<(any ResponseInfoType, SendMessagesResponse), Error> { guard namespace.requiresWriteAuthentication else { return SnodeAPI .send( request: SnodeRequest( endpoint: .sendMessage, body: LegacySendMessagesRequest( message: message, namespace: namespace ) ), to: snode, associatedWith: publicKey, using: dependencies ) .decoded(as: SendMessagesResponse.self, using: dependencies) .eraseToAnyPublisher() } guard let userED25519KeyPair: Box.KeyPair = Storage.shared.read({ db in Identity.fetchUserEd25519KeyPair(db) }) else { return Fail(error: SnodeAPIError.noKeyPair) .eraseToAnyPublisher() } return SnodeAPI .send( request: SnodeRequest( endpoint: .sendMessage, body: SendMessageRequest( message: message, namespace: namespace, subkey: nil, timestampMs: SnodeAPI.offsetTimestampMsNow(), ed25519PublicKey: userED25519KeyPair.publicKey, ed25519SecretKey: userED25519KeyPair.secretKey ) ), to: snode, associatedWith: publicKey, using: dependencies ) .decoded(as: SendMessagesResponse.self, using: dependencies) .eraseToAnyPublisher() } return getTargetSnodes(for: publicKey) .subscribe(on: Threading.workQueue) .flatMap { targetSnodes -> AnyPublisher<(Result<(ResponseInfoType, SendMessagesResponse), Error>, Int), Error> in Publishers .MergeMany( targetSnodes .map { targetSnode -> AnyPublisher, Never> in return sendMessage(to: targetSnode) .retry(maxRetryCount) .eraseToAnyPublisher() .asResult() } ) .map { result in (result, targetSnodes.count) } .setFailureType(to: Error.self) .eraseToAnyPublisher() } .eraseToAnyPublisher() } // MARK: Edit public static func updateExpiry( publicKey: String, serverHashes: [String], updatedExpiryMs: UInt64, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<[String: (hashes: [String], expiry: UInt64)], Error> { guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else { return Fail(error: SnodeAPIError.noKeyPair) .eraseToAnyPublisher() } let publicKey: String = (Features.useTestnet ? publicKey.removingIdPrefixIfNeeded() : publicKey ) return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) .flatMap { swarm -> AnyPublisher<[String: (hashes: [String], expiry: UInt64)], Error> in guard let snode: Snode = swarm.randomElement() else { return Fail(error: SnodeAPIError.generic) .eraseToAnyPublisher() } return SnodeAPI .send( request: SnodeRequest( endpoint: .expire, body: UpdateExpiryRequest( messageHashes: serverHashes, expiryMs: updatedExpiryMs, pubkey: publicKey, ed25519PublicKey: userED25519KeyPair.publicKey, ed25519SecretKey: userED25519KeyPair.secretKey, subkey: nil ) ), to: snode, associatedWith: publicKey, using: dependencies ) .decoded(as: UpdateExpiryResponse.self, using: dependencies) .flatMap { _, response -> AnyPublisher<[String: (hashes: [String], expiry: UInt64)], Error> in do { let result: [String: (hashes: [String], expiry: UInt64)] = try response.validResultMap( userX25519PublicKey: getUserHexEncodedPublicKey(), messageHashes: serverHashes, sodium: sodium.wrappedValue ) return Just(result) .setFailureType(to: Error.self) .eraseToAnyPublisher() } catch { return Fail(error: error) .eraseToAnyPublisher() } } .retry(maxRetryCount) .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } public static func revokeSubkey( publicKey: String, subkeyToRevoke: String, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher { guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else { return Fail(error: SnodeAPIError.noKeyPair) .eraseToAnyPublisher() } let publicKey: String = (Features.useTestnet ? publicKey.removingIdPrefixIfNeeded() : publicKey ) return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) .flatMap { swarm -> AnyPublisher in guard let snode: Snode = swarm.randomElement() else { return Fail(error: SnodeAPIError.generic) .eraseToAnyPublisher() } return SnodeAPI .send( request: SnodeRequest( endpoint: .revokeSubkey, body: RevokeSubkeyRequest( subkeyToRevoke: subkeyToRevoke, pubkey: publicKey, ed25519PublicKey: userED25519KeyPair.publicKey, ed25519SecretKey: userED25519KeyPair.secretKey ) ), to: snode, associatedWith: publicKey, using: dependencies ) .decoded(as: RevokeSubkeyResponse.self, using: dependencies) .flatMap { _, response -> AnyPublisher in do { try response.validateResult( userX25519PublicKey: getUserHexEncodedPublicKey(), subkeyToRevoke: subkeyToRevoke, sodium: sodium.wrappedValue ) } catch { return Fail(error: error) .eraseToAnyPublisher() } return Just(()) .setFailureType(to: Error.self) .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } // MARK: Delete public static func deleteMessages( publicKey: String, serverHashes: [String], using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<[String: Bool], Error> { guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else { return Fail(error: SnodeAPIError.noKeyPair) .eraseToAnyPublisher() } let publicKey: String = (Features.useTestnet ? publicKey.removingIdPrefixIfNeeded() : publicKey ) let userX25519PublicKey: String = getUserHexEncodedPublicKey() return getSwarm(for: publicKey) .subscribe(on: Threading.workQueue) .flatMap { swarm -> AnyPublisher<[String: Bool], Error> in Just(()) .setFailureType(to: Error.self) .flatMap { _ -> AnyPublisher in guard let snode: Snode = swarm.randomElement() else { return Fail(error: SnodeAPIError.generic) .eraseToAnyPublisher() } return Just(snode) .setFailureType(to: Error.self) .eraseToAnyPublisher() } .flatMap { snode -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( request: SnodeRequest( endpoint: .deleteMessages, body: DeleteMessagesRequest( messageHashes: serverHashes, requireSuccessfulDeletion: false, pubkey: userX25519PublicKey, ed25519PublicKey: userED25519KeyPair.publicKey, ed25519SecretKey: userED25519KeyPair.secretKey ) ), to: snode, associatedWith: publicKey, using: dependencies ) .subscribe(on: Threading.workQueue) .eraseToAnyPublisher() .decoded(as: DeleteMessagesResponse.self, using: dependencies) .map { _, response -> [String: Bool] in let validResultMap: [String: Bool] = response.validResultMap( userX25519PublicKey: userX25519PublicKey, serverHashes: serverHashes, sodium: sodium.wrappedValue ) // If at least one service node deleted successfully then we should // mark the hash as invalid so we don't try to fetch updates using // that hash going forward (if we do we would end up re-fetching // all old messages) if validResultMap.values.contains(true) { Storage.shared.writeAsync { db in try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( db, potentiallyInvalidHashes: serverHashes ) } } return validResultMap } .retry(maxRetryCount) .eraseToAnyPublisher() } .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } /// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation. public static func deleteAllMessages( namespace: SnodeAPI.Namespace? = nil, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<[String: Bool], Error> { guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else { return Fail(error: SnodeAPIError.noKeyPair) .eraseToAnyPublisher() } let userX25519PublicKey: String = getUserHexEncodedPublicKey() return getSwarm(for: userX25519PublicKey) .subscribe(on: Threading.workQueue) .flatMap { swarm -> AnyPublisher<[String: Bool], Error> in guard let snode: Snode = swarm.randomElement() else { return Fail(error: SnodeAPIError.generic) .eraseToAnyPublisher() } return getNetworkTime(from: snode) .flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( request: SnodeRequest( endpoint: .deleteAll, body: DeleteAllMessagesRequest( namespace: namespace, pubkey: userX25519PublicKey, timestampMs: timestampMs, ed25519PublicKey: userED25519KeyPair.publicKey, ed25519SecretKey: userED25519KeyPair.secretKey ) ), to: snode, associatedWith: nil, using: dependencies ) .decoded(as: DeleteAllMessagesResponse.self, using: dependencies) .map { _, response in let validResultMap: [String: Bool] = response.validResultMap( userX25519PublicKey: userX25519PublicKey, timestampMs: timestampMs, sodium: sodium.wrappedValue ) return validResultMap } .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } /// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation. public static func deleteAllMessages( beforeMs: UInt64, namespace: SnodeAPI.Namespace? = nil, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<[String: Bool], Error> { guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else { return Fail(error: SnodeAPIError.noKeyPair) .eraseToAnyPublisher() } let userX25519PublicKey: String = getUserHexEncodedPublicKey() return getSwarm(for: userX25519PublicKey) .subscribe(on: Threading.workQueue) .flatMap { swarm -> AnyPublisher<[String: Bool], Error> in guard let snode: Snode = swarm.randomElement() else { return Fail(error: SnodeAPIError.generic) .eraseToAnyPublisher() } return getNetworkTime(from: snode) .flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in SnodeAPI .send( request: SnodeRequest( endpoint: .deleteAllBefore, body: DeleteAllBeforeRequest( beforeMs: beforeMs, namespace: namespace, pubkey: userX25519PublicKey, timestampMs: timestampMs, ed25519PublicKey: userED25519KeyPair.publicKey, ed25519SecretKey: userED25519KeyPair.secretKey ) ), to: snode, associatedWith: nil, using: dependencies ) .decoded(as: DeleteAllBeforeResponse.self, using: dependencies) .map { _, response in let validResultMap: [String: Bool] = response.validResultMap( userX25519PublicKey: userX25519PublicKey, beforeMs: beforeMs, sodium: sodium.wrappedValue ) return validResultMap } .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } .retry(maxRetryCount) .eraseToAnyPublisher() } // MARK: - Internal API private static func getNetworkTime( from snode: Snode, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher { return SnodeAPI .send( request: SnodeRequest<[String: String]>( endpoint: .getInfo, body: [:] ), to: snode, associatedWith: nil ) .decoded(as: GetNetworkTimestampResponse.self, using: dependencies) .map { _, response in response.timestamp } .eraseToAnyPublisher() } internal static func getRandomSnode() -> AnyPublisher { // randomElement() uses the system's default random generator, which is cryptographically secure return getSnodePool() .map { $0.randomElement()! } .eraseToAnyPublisher() } private static func getSnodePoolFromSeedNode( dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher, Error> { let request: SnodeRequest = SnodeRequest( endpoint: .jsonGetNServiceNodes, body: GetServiceNodesRequest( activeOnly: true, limit: 256, fields: GetServiceNodesRequest.Fields( publicIp: true, storagePort: true, pubkeyEd25519: true, pubkeyX25519: true ) ) ) guard let target: String = seedNodePool.randomElement() else { return Fail(error: SnodeAPIError.snodePoolUpdatingFailed) .eraseToAnyPublisher() } guard let payload: Data = try? JSONEncoder().encode(request) else { return Fail(error: HTTPError.invalidJSON) .eraseToAnyPublisher() } SNLog("Populating snode pool using seed node: \(target).") return HTTP .execute( .post, "\(target)/json_rpc", body: payload, useSeedNodeURLSession: true ) .decoded(as: SnodePoolResponse.self, using: dependencies) .subscribe(on: Threading.workQueue) .mapError { error in switch error { case HTTPError.parsingFailed: return SnodeAPIError.snodePoolUpdatingFailed default: return error } } .map { snodePool -> Set in snodePool.result .serviceNodeStates .compactMap { $0.value } .asSet() } .retry(4) .handleEvents( receiveCompletion: { result in switch result { case .finished: SNLog("Got snode pool from seed node: \(target).") case .failure: SNLog("Failed to contact seed node at: \(target).") } } ) .eraseToAnyPublisher() } private static func getSnodePoolFromSnode( dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher, Error> { var snodePool = SnodeAPI.snodePool.wrappedValue var snodes: Set = [] (0..<3).forEach { _ in guard let snode = snodePool.randomElement() else { return } snodePool.remove(snode) snodes.insert(snode) } return Publishers .MergeMany( snodes .map { snode -> AnyPublisher, Error> in // Don't specify a limit in the request. Service nodes return a shuffled // list of nodes so if we specify a limit the 3 responses we get might have // very little overlap. SnodeAPI .send( request: SnodeRequest( endpoint: .oxenDaemonRPCCall, body: OxenDaemonRPCRequest( endpoint: .daemonGetServiceNodes, body: GetServiceNodesRequest( activeOnly: true, limit: nil, fields: GetServiceNodesRequest.Fields( publicIp: true, storagePort: true, pubkeyEd25519: true, pubkeyX25519: true ) ) ) ), to: snode, associatedWith: nil ) .decoded(as: SnodePoolResponse.self, using: dependencies) .mapError { error -> Error in switch error { case HTTPError.parsingFailed: return SnodeAPIError.snodePoolUpdatingFailed default: return error } } .map { _, snodePool -> Set in snodePool.result .serviceNodeStates .compactMap { $0.value } .asSet() } .retry(4) .eraseToAnyPublisher() } ) .collect() .flatMap { results -> AnyPublisher, Error> in let result: Set = results.reduce(Set()) { prev, next in prev.intersection(next) } // We want the snodes to agree on at least this many snodes guard result.count > 24 else { return Fail(error: SnodeAPIError.inconsistentSnodePools) .eraseToAnyPublisher() } // Limit the snode pool size to 256 so that we don't go too long without // refreshing it return Just(Set(result.prefix(256))) .setFailureType(to: Error.self) .eraseToAnyPublisher() } .eraseToAnyPublisher() } private static func send( request: SnodeRequest, to snode: Snode, associatedWith publicKey: String?, using dependencies: SSKDependencies = SSKDependencies() ) -> AnyPublisher<(ResponseInfoType, Data?), Error> { guard let payload: Data = try? JSONEncoder().encode(request) else { return Fail(error: HTTPError.invalidJSON) .eraseToAnyPublisher() } guard Features.useOnionRequests else { return HTTP .execute( .post, "\(snode.address):\(snode.port)/storage_rpc/v1", body: payload ) .map { response in (HTTP.ResponseInfo(code: -1, headers: [:]), response) } .mapError { error in switch error { case HTTPError.httpRequestFailed(let statusCode, let data): return (SnodeAPI.handleError(withStatusCode: statusCode, data: data, forSnode: snode, associatedWith: publicKey) ?? error) default: return error } } .eraseToAnyPublisher() } return dependencies.onionApi .sendOnionRequest( payload, to: snode ) .mapError { error in switch error { case HTTPError.httpRequestFailed(let statusCode, let data): return (SnodeAPI.handleError(withStatusCode: statusCode, data: data, forSnode: snode, associatedWith: publicKey) ?? error) default: return error } } .handleEvents( receiveOutput: { _, maybeData in // Extract and store hard fork information if returned guard let data: Data = maybeData, let snodeResponse: SnodeResponse = try? JSONDecoder() .decode(SnodeResponse.self, from: data) else { return } if snodeResponse.hardFork[1] > softfork { softfork = snodeResponse.hardFork[1] UserDefaults.standard[.softfork] = softfork } if snodeResponse.hardFork[0] > hardfork { hardfork = snodeResponse.hardFork[0] UserDefaults.standard[.hardfork] = hardfork softfork = snodeResponse.hardFork[1] UserDefaults.standard[.softfork] = softfork } } ) .eraseToAnyPublisher() } // MARK: - Parsing // The parsing utilities below use a best attempt approach to parsing; they warn for parsing // failures but don't throw exceptions. private static func parseSnodes(from responseData: Data?) -> Set { guard let responseData: Data = responseData, let responseJson: JSON = try? JSONSerialization.jsonObject( with: responseData, options: [ .fragmentsAllowed ] ) as? JSON else { SNLog("Failed to parse snodes from response data.") return [] } guard let rawSnodes = responseJson["snodes"] as? [JSON] else { SNLog("Failed to parse snodes from: \(responseJson).") return [] } guard let snodeData: Data = try? JSONSerialization.data(withJSONObject: rawSnodes, options: []) else { return [] } // FIXME: Hopefully at some point this different Snode structure will be deprecated and can be removed if let swarmSnodes: [SwarmSnode] = try? JSONDecoder().decode([Failable].self, from: snodeData).compactMap({ $0.value }), !swarmSnodes.isEmpty { return swarmSnodes.map { $0.toSnode() }.asSet() } return ((try? JSONDecoder().decode([Failable].self, from: snodeData)) ?? []) .compactMap { $0.value } .asSet() } // MARK: - Error Handling /// - Note: Should only be invoked from `Threading.workQueue` to avoid race conditions. @discardableResult internal static func handleError( withStatusCode statusCode: UInt, data: Data?, forSnode snode: Snode, associatedWith publicKey: String? = nil ) -> Error? { func handleBadSnode() { let oldFailureCount = (SnodeAPI.snodeFailureCount.wrappedValue[snode] ?? 0) let newFailureCount = oldFailureCount + 1 SnodeAPI.snodeFailureCount.mutate { $0[snode] = newFailureCount } SNLog("Couldn't reach snode at: \(snode); setting failure count to \(newFailureCount).") if newFailureCount >= SnodeAPI.snodeFailureThreshold { SNLog("Failure threshold reached for: \(snode); dropping it.") if let publicKey = publicKey { SnodeAPI.dropSnodeFromSwarmIfNeeded(snode, publicKey: publicKey) } SnodeAPI.dropSnodeFromSnodePool(snode) SNLog("Snode pool count: \(snodePool.wrappedValue.count).") SnodeAPI.snodeFailureCount.mutate { $0[snode] = 0 } } } switch statusCode { case 500, 502, 503: // The snode is unreachable handleBadSnode() case 404: // May caused by invalid open groups SNLog("Can't reach the server.") case 406: SNLog("The user's clock is out of sync with the service node network.") return SnodeAPIError.clockOutOfSync case 421: // The snode isn't associated with the given public key anymore if let publicKey = publicKey { func invalidateSwarm() { SNLog("Invalidating swarm for: \(publicKey).") SnodeAPI.dropSnodeFromSwarmIfNeeded(snode, publicKey: publicKey) } if let data: Data = data { let snodes = parseSnodes(from: data) if !snodes.isEmpty { setSwarm(to: snodes, for: publicKey) } else { invalidateSwarm() } } else { invalidateSwarm() } } else { SNLog("Got a 421 without an associated public key.") } default: handleBadSnode() SNLog("Unhandled response code: \(statusCode).") } return nil } }