import PromiseKit @objc(LKLongPoller) public final class LokiLongPoller : NSObject { private let onMessagesReceived: ([SSKProtoEnvelope]) -> Void private let storage = OWSPrimaryStorage.shared() private var hasStarted = false private var hasStopped = false private var connections = Set>() private var usedSnodes = Set() // MARK: Settings private let connectionCount = 3 private let retryInterval: TimeInterval = 4 // MARK: Convenience private var userHexEncodedPublicKey: String { return getUserHexEncodedPublicKey() } // MARK: Initialization @objc public init(onMessagesReceived: @escaping ([SSKProtoEnvelope]) -> Void) { self.onMessagesReceived = onMessagesReceived super.init() } // MARK: Public API @objc public func startIfNeeded() { guard !hasStarted else { return } print("[Loki] Started long polling.") hasStarted = true hasStopped = false openConnections() } @objc public func stopIfNeeded() { guard !hasStopped else { return } print("[Loki] Stopped long polling.") hasStarted = false hasStopped = true usedSnodes.removeAll() } // MARK: Private API private func openConnections() { guard !hasStopped else { return } LokiAPI.getSwarm(for: userHexEncodedPublicKey).then(on: DispatchQueue.global()) { [weak self] _ -> Guarantee<[Result]> in guard let strongSelf = self else { return Guarantee.value([Result]()) } strongSelf.usedSnodes.removeAll() let connections: [Promise] = (0...pending() strongSelf.openConnectionToNextSnode(seal: seal) return promise } strongSelf.connections = Set(connections) return when(resolved: connections) }.ensure(on: DispatchQueue.global()) { [weak self] in guard let strongSelf = self else { return } Timer.scheduledTimer(withTimeInterval: strongSelf.retryInterval, repeats: false) { _ in guard let strongSelf = self else { return } strongSelf.openConnections() } } } private func openConnectionToNextSnode(seal: Resolver) { let swarm = LokiAPI.swarmCache[userHexEncodedPublicKey] ?? [] let userHexEncodedPublicKey = self.userHexEncodedPublicKey let unusedSnodes = Set(swarm).subtracting(usedSnodes) if !unusedSnodes.isEmpty { let nextSnode = unusedSnodes.randomElement()! usedSnodes.insert(nextSnode) print("[Loki] Opening long polling connection to \(nextSnode).") longPoll(nextSnode, seal: seal).catch(on: LokiAPI.errorHandlingQueue) { [weak self] error in print("[Loki] Long polling connection to \(nextSnode) failed; dropping it and switching to next snode.") LokiAPI.dropIfNeeded(nextSnode, hexEncodedPublicKey: userHexEncodedPublicKey) self?.openConnectionToNextSnode(seal: seal) } } else { seal.fulfill(()) } } private func longPoll(_ target: LokiAPITarget, seal: Resolver) -> Promise { return LokiAPI.getRawMessages(from: target, usingLongPolling: true).then(on: DispatchQueue.global()) { [weak self] rawResponse -> Promise in guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) } let messages = LokiAPI.parseRawMessagesResponse(rawResponse, from: target) let hexEncodedPublicKeys = Set(messages.compactMap { $0.source }) let promises = hexEncodedPublicKeys.map { LokiAPI.getDestinations(for: $0) } return when(resolved: promises).then(on: DispatchQueue.global()) { _ -> Promise in guard let strongSelf = self, !strongSelf.hasStopped else { return Promise.value(()) } strongSelf.onMessagesReceived(messages) return strongSelf.longPoll(target, seal: seal) } } } }