// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Foundation import GRDB import PromiseKit import SessionSnodeKit import SessionMessagingKit import SessionUtilitiesKit public final class BackgroundPoller { private static var promises: [Promise] = [] public static var isValid: Bool = false public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) { promises = [] .appending(pollForMessages()) .appending(contentsOf: pollForClosedGroupMessages()) .appending( contentsOf: Storage.shared .read { db in // The default room promise creates an OpenGroup with an empty `roomToken` value, // we don't want to start a poller for this as the user hasn't actually joined a room try OpenGroup .select(.server) .filter(OpenGroup.Columns.roomToken != "") .filter(OpenGroup.Columns.isActive) .distinct() .asRequest(of: String.self) .fetchSet(db) } .defaulting(to: []) .map { server in let poller: OpenGroupAPI.Poller = OpenGroupAPI.Poller(for: server) poller.stop() return poller.poll( isBackgroundPoll: true, isBackgroundPollerValid: { BackgroundPoller.isValid }, isPostCapabilitiesRetry: false ) } ) when(resolved: promises) .done { _ in // If we have already invalidated the timer then do nothing (we essentially timed out) guard BackgroundPoller.isValid else { return } completionHandler(.newData) } .catch { error in // If we have already invalidated the timer then do nothing (we essentially timed out) guard BackgroundPoller.isValid else { return } SNLog("Background poll failed due to error: \(error)") completionHandler(.failed) } } private static func pollForMessages() -> Promise { let userPublicKey: String = getUserHexEncodedPublicKey() return getMessages(for: userPublicKey) } private static func pollForClosedGroupMessages() -> [Promise] { // Fetch all closed groups (excluding any don't contain the current user as a // GroupMemeber as the user is no longer a member of those) return Storage.shared .read { db in try ClosedGroup .select(.threadId) .joining( required: ClosedGroup.members .filter(GroupMember.Columns.profileId == getUserHexEncodedPublicKey(db)) ) .asRequest(of: String.self) .fetchAll(db) } .defaulting(to: []) .map { groupPublicKey in ClosedGroupPoller.poll( groupPublicKey, on: DispatchQueue.main, maxRetryCount: 0, isBackgroundPoll: true, isBackgroundPollValid: { BackgroundPoller.isValid } ) } } private static func getMessages(for publicKey: String) -> Promise { return SnodeAPI.getSwarm(for: publicKey) .then(on: DispatchQueue.main) { swarm -> Promise in guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic } return SnodeAPI.getMessages(from: snode, associatedWith: publicKey) .then(on: DispatchQueue.main) { messages -> Promise in guard !messages.isEmpty, BackgroundPoller.isValid else { return Promise.value(()) } var jobsToRun: [Job] = [] Storage.shared.write { db in messages .compactMap { message -> ProcessedMessage? in do { return try Message.processRawReceivedMessage(db, rawMessage: message) } catch { switch error { // Ignore duplicate & selfSend message errors (and don't bother // logging them as there will be a lot since we each service node // duplicates messages) case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, MessageReceiverError.duplicateMessage, MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break // In the background ignore 'SQLITE_ABORT' (it generally means // the BackgroundPoller has timed out case DatabaseError.SQLITE_ABORT: break default: SNLog("Failed to deserialize envelope due to error: \(error).") } return nil } } .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } .forEach { threadId, threadMessages in let maybeJob: Job? = Job( variant: .messageReceive, behaviour: .runOnce, threadId: threadId, details: MessageReceiveJob.Details( messages: threadMessages.map { $0.messageInfo }, isBackgroundPoll: true ) ) guard let job: Job = maybeJob else { return } // Add to the JobRunner so they are persistent and will retry on // the next app run if they fail JobRunner.add(db, job: job, canStartJob: false) jobsToRun.append(job) } } let promises: [Promise] = jobsToRun.map { job -> Promise in let (promise, seal) = Promise.pending() // Note: In the background we just want jobs to fail silently MessageReceiveJob.run( job, queue: DispatchQueue.main, success: { _, _ in seal.fulfill(()) }, failure: { _, _, _ in seal.fulfill(()) }, deferred: { _ in seal.fulfill(()) } ) return promise } return when(fulfilled: promises) } } } }