Fix background polling

This commit is contained in:
Niels Andriesse 2020-12-07 11:21:24 +11:00
parent 21ec051016
commit 438bbccdfa
7 changed files with 80 additions and 19 deletions

View File

@ -1,23 +1,23 @@
import PromiseKit
import SessionSnodeKit
@objc(LKBackgroundPoller)
public final class BackgroundPoller : NSObject {
private static var closedGroupPoller: ClosedGroupPoller!
private static var promises: [Promise<Void>] = []
private override init() { }
@objc(pollWithCompletionHandler:)
public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) {
var promises: [Promise<Void>] = []
// TODO TODO TODO
// promises.append(AppEnvironment.shared.messageFetcherJob.run()) // FIXME: It'd be nicer to just use Poller directly
closedGroupPoller = ClosedGroupPoller()
promises.append(contentsOf: closedGroupPoller.pollOnce())
promises = []
promises.append(pollForMessages())
promises.append(contentsOf: pollForClosedGroupMessages())
let openGroups: [String:OpenGroup] = Storage.shared.getAllUserOpenGroups()
openGroups.values.forEach { openGroup in
let poller = OpenGroupPoller(for: openGroup)
poller.stop()
promises.append(poller.pollForNewMessages())
promises.append(poller.pollForNewMessages(isBackgroundPoll: true))
}
when(resolved: promises).done { _ in
completionHandler(.newData)
@ -25,4 +25,32 @@ public final class BackgroundPoller : NSObject {
completionHandler(.failed)
}
}
private static func pollForMessages() -> Promise<Void> {
let userPublicKey = getUserHexEncodedPublicKey()
return getMessages(for: userPublicKey)
}
private static func pollForClosedGroupMessages() -> [Promise<Void>] {
let publicKeys = Storage.shared.getUserClosedGroupPublicKeys()
return publicKeys.map { getMessages(for: $0) }
}
private static func getMessages(for publicKey: String) -> Promise<Void> {
return SnodeAPI.getSwarm(for: publicKey).then2 { swarm -> Promise<Void> in
guard let snode = swarm.randomElement() else { throw SnodeAPI.Error.generic }
return SnodeAPI.getRawMessages(from: snode, associatedWith: publicKey).then(on: DispatchQueue.main) { rawResponse -> Promise<Void> in
let messages = SnodeAPI.parseRawMessagesResponse(rawResponse, from: snode, associatedWith: publicKey)
let promises = messages.compactMap { json -> Promise<Void>? in
// Use a best attempt approach here; we don't want to fail the entire process if one of the
// messages failed to parse.
guard let envelope = SNProtoEnvelope.from(json),
let data = try? envelope.serializedData() else { return nil }
let job = MessageReceiveJob(data: data, isBackgroundPoll: true)
return job.execute()
}
return when(fulfilled: promises) // The promise returned by MessageReceiveJob never rejects
}
}
}
}

View File

@ -1,9 +1,11 @@
import SessionUtilitiesKit
import PromiseKit
public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSCoding conformance is needed for YapDatabase compatibility
public let data: Data
public let openGroupMessageServerID: UInt64?
public let openGroupID: String?
public let isBackgroundPoll: Bool
public var delegate: JobDelegate?
public var id: String?
public var failureCount: UInt = 0
@ -13,10 +15,11 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
public static let maxFailureCount: UInt = 10
// MARK: Initialization
public init(data: Data, openGroupMessageServerID: UInt64? = nil, openGroupID: String? = nil) {
public init(data: Data, openGroupMessageServerID: UInt64? = nil, openGroupID: String? = nil, isBackgroundPoll: Bool) {
self.data = data
self.openGroupMessageServerID = openGroupMessageServerID
self.openGroupID = openGroupID
self.isBackgroundPoll = isBackgroundPoll
#if DEBUG
if openGroupMessageServerID != nil { assert(openGroupID != nil) }
if openGroupID != nil { assert(openGroupMessageServerID != nil) }
@ -26,10 +29,12 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
// MARK: Coding
public init?(coder: NSCoder) {
guard let data = coder.decodeObject(forKey: "data") as! Data?,
let id = coder.decodeObject(forKey: "id") as! String? else { return nil }
let id = coder.decodeObject(forKey: "id") as! String?,
let isBackgroundPoll = coder.decodeObject(forKey: "isBackgroundPoll") as! Bool? else { return nil }
self.data = data
self.openGroupMessageServerID = coder.decodeObject(forKey: "openGroupMessageServerID") as! UInt64?
self.openGroupID = coder.decodeObject(forKey: "openGroupID") as! String?
self.isBackgroundPoll = isBackgroundPoll
self.id = id
self.failureCount = coder.decodeObject(forKey: "failureCount") as! UInt? ?? 0
}
@ -38,17 +43,24 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
coder.encode(data, forKey: "data")
coder.encode(openGroupMessageServerID, forKey: "openGroupMessageServerID")
coder.encode(openGroupID, forKey: "openGroupID")
coder.encode(isBackgroundPoll, forKey: "isBackgroundPoll")
coder.encode(id, forKey: "id")
coder.encode(failureCount, forKey: "failureCount")
}
// MARK: Running
public func execute() {
let _: Promise<Void> = execute()
}
public func execute() -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending()
SNMessagingKitConfiguration.shared.storage.withAsync({ transaction in // Intentionally capture self
do {
let (message, proto) = try MessageReceiver.parse(self.data, openGroupMessageServerID: self.openGroupMessageServerID, using: transaction)
try MessageReceiver.handle(message, associatedWithProto: proto, openGroupID: self.openGroupID, using: transaction)
try MessageReceiver.handle(message, associatedWithProto: proto, openGroupID: self.openGroupID, isBackgroundPoll: self.isBackgroundPoll, using: transaction)
self.handleSuccess()
seal.fulfill(())
} catch {
SNLog("Couldn't receive message due to error: \(error).")
if let error = error as? MessageReceiver.Error, !error.isRetryable {
@ -56,8 +68,10 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
} else {
self.handleFailure(error: error)
}
seal.fulfill(()) // The promise is just used to keep track of when we're done
}
}, completion: { })
return promise
}
private func handleSuccess() {

View File

@ -7,13 +7,13 @@ extension MessageReceiver {
return SSKEnvironment.shared.blockingManager.isRecipientIdBlocked(publicKey)
}
public static func handle(_ message: Message, associatedWithProto proto: SNProtoContent, openGroupID: String?, using transaction: Any) throws {
public static func handle(_ message: Message, associatedWithProto proto: SNProtoContent, openGroupID: String?, isBackgroundPoll: Bool, using transaction: Any) throws {
switch message {
case let message as ReadReceipt: handleReadReceipt(message, using: transaction)
case let message as TypingIndicator: handleTypingIndicator(message, using: transaction)
case let message as ClosedGroupUpdate: handleClosedGroupUpdate(message, using: transaction)
case let message as ExpirationTimerUpdate: handleExpirationTimerUpdate(message, using: transaction)
case let message as VisibleMessage: try handleVisibleMessage(message, associatedWithProto: proto, openGroupID: openGroupID, using: transaction)
case let message as VisibleMessage: try handleVisibleMessage(message, associatedWithProto: proto, openGroupID: openGroupID, isBackgroundPoll: isBackgroundPoll, using: transaction)
default: fatalError()
}
}
@ -136,7 +136,7 @@ extension MessageReceiver {
}
@discardableResult
public static func handleVisibleMessage(_ message: VisibleMessage, associatedWithProto proto: SNProtoContent, openGroupID: String?, using transaction: Any) throws -> String {
public static func handleVisibleMessage(_ message: VisibleMessage, associatedWithProto proto: SNProtoContent, openGroupID: String?, isBackgroundPoll: Bool, using transaction: Any) throws -> String {
let storage = SNMessagingKitConfiguration.shared.storage
let transaction = transaction as! YapDatabaseReadWriteTransaction
var isMainAppAndActive = false
@ -206,7 +206,7 @@ extension MessageReceiver {
cancelTypingIndicatorsIfNeeded(for: message.sender!)
}
// Notify the user if needed
guard isMainAppAndActive, let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction),
guard (isMainAppAndActive || isBackgroundPoll), let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction),
let thread = TSThread.fetch(uniqueId: threadID, transaction: transaction) else { return tsIncomingMessageID }
SSKEnvironment.shared.notificationsManager!.notifyUser(for: tsIncomingMessage, in: thread, transaction: transaction)
return tsIncomingMessageID

View File

@ -67,7 +67,7 @@ public final class ClosedGroupPoller : NSObject {
guard let envelope = SNProtoEnvelope.from(json) else { return }
do {
let data = try envelope.serializedData()
let job = MessageReceiveJob(data: data)
let job = MessageReceiveJob(data: data, isBackgroundPoll: false)
Storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
}

View File

@ -51,11 +51,18 @@ public final class OpenGroupPoller : NSObject {
@discardableResult
public func pollForNewMessages() -> Promise<Void> {
return pollForNewMessages(isBackgroundPoll: false)
}
@discardableResult
public func pollForNewMessages(isBackgroundPoll: Bool) -> Promise<Void> {
guard !self.isPolling else { return Promise.value(()) }
self.isPolling = true
let openGroup = self.openGroup
let userPublicKey = getUserHexEncodedPublicKey()
return OpenGroupAPI.getMessages(for: openGroup.channel, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { messages in
let (promise, seal) = Promise<Void>.pending()
promise.retainUntilComplete()
OpenGroupAPI.getMessages(for: openGroup.channel, on: openGroup.server).done(on: DispatchQueue.global(qos: .default)) { messages in
self.isPolling = false
// Sorting the messages by timestamp before importing them fixes an issue where messages that quote older messages can't find those older messages
messages.sorted { $0.serverTimestamp < $1.serverTimestamp }.forEach { message in
@ -153,11 +160,23 @@ public final class OpenGroupPoller : NSObject {
Storage.write { transaction in
Storage.shared.setOpenGroupDisplayName(to: senderDisplayName, for: senderPublicKey, inOpenGroupWithID: openGroup.id, using: transaction)
let messageServerID = message.serverID
let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: messageServerID, openGroupID: openGroup.id)
SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
let job = MessageReceiveJob(data: try! envelope.buildSerializedData(), openGroupMessageServerID: messageServerID, openGroupID: openGroup.id, isBackgroundPoll: isBackgroundPoll)
if isBackgroundPoll {
job.execute().done(on: DispatchQueue.global(qos: .userInitiated)) {
seal.fulfill(())
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { _ in
seal.fulfill(()) // The promise is just used to keep track of when we're done
}
} else {
SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
seal.fulfill(())
}
}
}
}.catch(on: DispatchQueue.global(qos: .userInitiated)) { _ in
seal.fulfill(()) // The promise is just used to keep track of when we're done
}
return promise
}
private func pollForDeletedMessages() {

View File

@ -97,7 +97,7 @@ public final class Poller : NSObject {
guard let envelope = SNProtoEnvelope.from(json) else { return }
do {
let data = try envelope.serializedData()
let job = MessageReceiveJob(data: data)
let job = MessageReceiveJob(data: data, isBackgroundPoll: false)
Storage.write { transaction in
SessionMessagingKit.JobQueue.shared.add(job, using: transaction)
}

View File

@ -43,7 +43,7 @@ public final class NotificationServiceExtension : UNNotificationServiceExtension
var userInfo: [String:Any] = [ NotificationServiceExtension.isFromRemoteKey : true ]
switch message {
case let visibleMessage as VisibleMessage:
let tsIncomingMessageID = try MessageReceiver.handleVisibleMessage(visibleMessage, associatedWithProto: proto, openGroupID: nil, using: transaction)
let tsIncomingMessageID = try MessageReceiver.handleVisibleMessage(visibleMessage, associatedWithProto: proto, openGroupID: nil, isBackgroundPoll: false, using: transaction)
guard let tsIncomingMessage = TSIncomingMessage.fetch(uniqueId: tsIncomingMessageID, transaction: transaction) else {
return self.handleFailure(for: notificationContent)
}