Progress handling messages in the notification extension
Turned the processing of DeadlockWorkAround messages into a job Updated the logic to trigger the DeadlockWorkAroundJob after sharing content from within the app Updated the logic to persist prepared quote and linkPreview attachments Updated the JobRunner to support blocking "on active" jobs in some cases Fixed an issue where the Share Extension could fail due to not having a snode pool loaded Fixed an issue where tapping a remote notification wasn't opening the conversation
This commit is contained in:
parent
55975fef40
commit
2fde3eb691
|
@ -638,6 +638,8 @@
|
|||
FD5931A32A89FBBB0040147D /* DeadlockWorkAround.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5931A22A89FBBB0040147D /* DeadlockWorkAround.swift */; };
|
||||
FD5931A72A8DA5DA0040147D /* SQLInterpolation+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5931A62A8DA5DA0040147D /* SQLInterpolation+Utilities.swift */; };
|
||||
FD5931AB2A8DCB0A0040147D /* ScopeAdapter+Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5931AA2A8DCB0A0040147D /* ScopeAdapter+Utilities.swift */; };
|
||||
FD5931AD2A92D7820040147D /* ProcessDeadlockWorkAroundJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5931AC2A92D7820040147D /* ProcessDeadlockWorkAroundJob.swift */; };
|
||||
FD5931AF2A92D9320040147D /* _016_AddDeadlockWorkAroundJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5931AE2A92D9320040147D /* _016_AddDeadlockWorkAroundJob.swift */; };
|
||||
FD5C72F7284F0E560029977D /* MessageReceiver+ReadReceipts.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5C72F6284F0E560029977D /* MessageReceiver+ReadReceipts.swift */; };
|
||||
FD5C72F9284F0E880029977D /* MessageReceiver+TypingIndicators.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5C72F8284F0E880029977D /* MessageReceiver+TypingIndicators.swift */; };
|
||||
FD5C72FB284F0EA10029977D /* MessageReceiver+DataExtractionNotification.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD5C72FA284F0EA10029977D /* MessageReceiver+DataExtractionNotification.swift */; };
|
||||
|
@ -1761,6 +1763,8 @@
|
|||
FD5931A22A89FBBB0040147D /* DeadlockWorkAround.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DeadlockWorkAround.swift; sourceTree = "<group>"; };
|
||||
FD5931A62A8DA5DA0040147D /* SQLInterpolation+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SQLInterpolation+Utilities.swift"; sourceTree = "<group>"; };
|
||||
FD5931AA2A8DCB0A0040147D /* ScopeAdapter+Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "ScopeAdapter+Utilities.swift"; sourceTree = "<group>"; };
|
||||
FD5931AC2A92D7820040147D /* ProcessDeadlockWorkAroundJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ProcessDeadlockWorkAroundJob.swift; sourceTree = "<group>"; };
|
||||
FD5931AE2A92D9320040147D /* _016_AddDeadlockWorkAroundJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _016_AddDeadlockWorkAroundJob.swift; sourceTree = "<group>"; };
|
||||
FD5C72F6284F0E560029977D /* MessageReceiver+ReadReceipts.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "MessageReceiver+ReadReceipts.swift"; sourceTree = "<group>"; };
|
||||
FD5C72F8284F0E880029977D /* MessageReceiver+TypingIndicators.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "MessageReceiver+TypingIndicators.swift"; sourceTree = "<group>"; };
|
||||
FD5C72FA284F0EA10029977D /* MessageReceiver+DataExtractionNotification.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "MessageReceiver+DataExtractionNotification.swift"; sourceTree = "<group>"; };
|
||||
|
@ -3615,6 +3619,7 @@
|
|||
FD8ECF7C2934293A00C0D1BB /* _013_SessionUtilChanges.swift */,
|
||||
FD778B6329B189FF001BAC6B /* _014_GenerateInitialUserConfigDumps.swift */,
|
||||
FD1D732D2A86114600E3F410 /* _015_BlockCommunityMessageRequests.swift */,
|
||||
FD5931AE2A92D9320040147D /* _016_AddDeadlockWorkAroundJob.swift */,
|
||||
);
|
||||
path = Migrations;
|
||||
sourceTree = "<group>";
|
||||
|
@ -4339,6 +4344,7 @@
|
|||
C352A35A2557824E00338F3E /* AttachmentUploadJob.swift */,
|
||||
7B521E0929BFF84400C3C36A /* GroupLeavingJob.swift */,
|
||||
FD2B4AFE2946C93200AB4848 /* ConfigurationSyncJob.swift */,
|
||||
FD5931AC2A92D7820040147D /* ProcessDeadlockWorkAroundJob.swift */,
|
||||
);
|
||||
path = Types;
|
||||
sourceTree = "<group>";
|
||||
|
@ -5899,6 +5905,7 @@
|
|||
C32C5A88256DBCF9003C73A2 /* MessageReceiver+ClosedGroups.swift in Sources */,
|
||||
B8D0A25925E367AC00C1835E /* Notification+MessageReceiver.swift in Sources */,
|
||||
FD245C53285065DB00B966DD /* ProximityMonitoringManager.swift in Sources */,
|
||||
FD5931AD2A92D7820040147D /* ProcessDeadlockWorkAroundJob.swift in Sources */,
|
||||
FD245C55285065E500B966DD /* OpenGroupManager.swift in Sources */,
|
||||
FDC4387227B5BB3B00C60D73 /* FileUploadResponse.swift in Sources */,
|
||||
C32C599E256DB02B003C73A2 /* TypingIndicators.swift in Sources */,
|
||||
|
@ -5922,6 +5929,7 @@
|
|||
FD8ECF7F2934298100C0D1BB /* ConfigDump.swift in Sources */,
|
||||
FDA1E83B29A5F2D500C5C3BD /* SessionUtil+Shared.swift in Sources */,
|
||||
C352A2FF25574B6300338F3E /* MessageSendJob.swift in Sources */,
|
||||
FD5931AF2A92D9320040147D /* _016_AddDeadlockWorkAroundJob.swift in Sources */,
|
||||
FD16AB612A1DD9B60083D849 /* ProfilePictureView+Convenience.swift in Sources */,
|
||||
FD23CE242A675C440000B97C /* Crypto+SessionMessagingKit.swift in Sources */,
|
||||
B8856D11256F112A001CE70E /* OWSAudioSession.swift in Sources */,
|
||||
|
|
|
@ -980,6 +980,7 @@ extension ConversationVC:
|
|||
|
||||
// Otherwise share the file
|
||||
let shareVC = UIActivityViewController(activityItems: [ fileUrl ], applicationActivities: nil)
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC)
|
||||
|
||||
if UIDevice.current.isIPad {
|
||||
shareVC.excludedActivityTypes = []
|
||||
|
|
|
@ -654,6 +654,7 @@ private final class EnterPublicKeyVC: UIViewController {
|
|||
|
||||
@objc private func sharePublicKey() {
|
||||
let shareVC = UIActivityViewController(activityItems: [ getUserHexEncodedPublicKey() ], applicationActivities: nil)
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC)
|
||||
|
||||
if UIDevice.current.isIPad {
|
||||
shareVC.excludedActivityTypes = []
|
||||
|
|
|
@ -150,6 +150,7 @@ extension AllMediaViewController: UIDocumentInteractionControllerDelegate {
|
|||
extension AllMediaViewController: DocumentTileViewControllerDelegate {
|
||||
public func share(fileUrl: URL) {
|
||||
let shareVC = UIActivityViewController(activityItems: [ fileUrl ], applicationActivities: nil)
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC)
|
||||
|
||||
if UIDevice.current.isIPad {
|
||||
shareVC.excludedActivityTypes = []
|
||||
|
|
|
@ -519,6 +519,7 @@ class MediaPageViewController: UIPageViewController, UIPageViewControllerDataSou
|
|||
}
|
||||
|
||||
let shareVC = UIActivityViewController(activityItems: [ URL(fileURLWithPath: originalFilePath) ], applicationActivities: nil)
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC)
|
||||
|
||||
if UIDevice.current.isIPad {
|
||||
shareVC.excludedActivityTypes = []
|
||||
|
|
|
@ -568,8 +568,10 @@ class NotificationActionHandler {
|
|||
.eraseToAnyPublisher()
|
||||
}
|
||||
|
||||
return Storage.shared
|
||||
.writePublisher { db in
|
||||
// Note: This will actually be executed within the main app (rather than an extension) so can just
|
||||
// follow the standard sending process
|
||||
return dependencies.storage
|
||||
.writePublisher { db -> Job? in
|
||||
let interaction: Interaction = try Interaction(
|
||||
threadId: threadId,
|
||||
authorId: getUserHexEncodedPublicKey(db),
|
||||
|
@ -598,23 +600,38 @@ class NotificationActionHandler {
|
|||
)
|
||||
)
|
||||
|
||||
preconditionFailure("") // TODO: Need to refactor this similar to the share extension
|
||||
// return try MessageSender.preparedSendData(
|
||||
// db,
|
||||
// interaction: interaction,
|
||||
// preparedAttachments: nil,
|
||||
// threadId: threadId,
|
||||
// threadVariant: thread.variant,
|
||||
// using: dependencies
|
||||
// )
|
||||
return try MessageSender.send(
|
||||
db,
|
||||
interaction: interaction,
|
||||
threadId: thread.id,
|
||||
threadVariant: thread.variant,
|
||||
using: dependencies
|
||||
)
|
||||
}
|
||||
.flatMap { job in
|
||||
Deferred {
|
||||
Future { resolution in
|
||||
guard let job: Job = job else {
|
||||
return resolution(Result.failure(JobRunnerError.missingRequiredDetails))
|
||||
}
|
||||
|
||||
MessageSendJob.run(
|
||||
job,
|
||||
queue: DispatchQueue.global(qos: .background),
|
||||
success: { _, _, _ in resolution(Result.success(())) },
|
||||
failure: { _, error, _, _ in resolution(Result.failure(error ?? HTTPError.generic)) },
|
||||
deferred: { _, _ in resolution(Result.success(())) },
|
||||
using: dependencies
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
.flatMap { MessageSender.sendImmediate(data: $0, using: dependencies) }
|
||||
.handleEvents(
|
||||
receiveCompletion: { result in
|
||||
switch result {
|
||||
case .finished: break
|
||||
case .failure:
|
||||
Storage.shared.read { [weak self] db in
|
||||
dependencies.storage.read { [weak self] db in
|
||||
self?.notificationPresenter.notifyForFailedSend(
|
||||
db,
|
||||
in: thread,
|
||||
|
|
|
@ -265,7 +265,6 @@ public class UserNotificationActionHandler: NSObject {
|
|||
case .finished: break
|
||||
case .failure(let error):
|
||||
completionHandler()
|
||||
owsFailDebug("error: \(error)")
|
||||
Logger.error("error: \(error)")
|
||||
}
|
||||
},
|
||||
|
|
|
@ -195,7 +195,9 @@ class HelpViewModel: SessionTableViewModel<NoNav, HelpViewModel.Section, HelpVie
|
|||
activityItems: [ URL(fileURLWithPath: latestLogFilePath) ],
|
||||
applicationActivities: nil
|
||||
)
|
||||
shareVC.completionWithItemsHandler = { _, _, _, _ in onShareComplete?() }
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC) { _ in
|
||||
onShareComplete?()
|
||||
}
|
||||
|
||||
if UIDevice.current.isIPad {
|
||||
shareVC.excludedActivityTypes = []
|
||||
|
@ -266,7 +268,7 @@ class HelpViewModel: SessionTableViewModel<NoNav, HelpViewModel.Section, HelpVie
|
|||
],
|
||||
applicationActivities: nil
|
||||
)
|
||||
shareVC.completionWithItemsHandler = { [weak self] _, completed, _, _ in
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC) { [weak self] completed in
|
||||
guard
|
||||
completed &&
|
||||
generatedPassword == self?.databaseKeyEncryptionPassword
|
||||
|
|
|
@ -274,6 +274,8 @@ private final class ViewMyQRCodeVC : UIViewController {
|
|||
@objc private func shareQRCode() {
|
||||
let qrCode = QRCode.generate(for: getUserHexEncodedPublicKey(), hasBackground: true)
|
||||
let shareVC = UIActivityViewController(activityItems: [ qrCode ], applicationActivities: nil)
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC)
|
||||
|
||||
if UIDevice.current.isIPad {
|
||||
shareVC.excludedActivityTypes = []
|
||||
shareVC.popoverPresentationController?.permittedArrowDirections = []
|
||||
|
|
|
@ -684,6 +684,7 @@ class SettingsViewModel: SessionTableViewModel<SettingsViewModel.NavButton, Sett
|
|||
activityItems: [ sessionId ],
|
||||
applicationActivities: nil
|
||||
)
|
||||
shareVC.completionWithItemsHandler = ProcessDeadlockWorkAroundJob.afterAppShare(shareVC)
|
||||
|
||||
self.transitionToScreen(shareVC, transitionType: .present)
|
||||
}
|
||||
|
|
|
@ -32,7 +32,8 @@ public enum SNMessagingKit: MigratableTarget { // Just to make the external API
|
|||
_012_AddFTSIfNeeded.self,
|
||||
_013_SessionUtilChanges.self,
|
||||
_014_GenerateInitialUserConfigDumps.self,
|
||||
_015_BlockCommunityMessageRequests.self
|
||||
_015_BlockCommunityMessageRequests.self,
|
||||
_016_AddDeadlockWorkAroundJob.self
|
||||
]
|
||||
]
|
||||
)
|
||||
|
@ -55,5 +56,6 @@ public enum SNMessagingKit: MigratableTarget { // Just to make the external API
|
|||
JobRunner.setExecutor(AttachmentDownloadJob.self, for: .attachmentDownload)
|
||||
JobRunner.setExecutor(ConfigurationSyncJob.self, for: .configurationSync)
|
||||
JobRunner.setExecutor(ConfigMessageReceiveJob.self, for: .configMessageReceive)
|
||||
JobRunner.setExecutor(ProcessDeadlockWorkAroundJob.self, for: .processDeadlockWorkAround)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
|
||||
|
||||
import Foundation
|
||||
import GRDB
|
||||
import SessionUtilitiesKit
|
||||
import SessionSnodeKit
|
||||
|
||||
/// This migration adds the `DeadLockWorkAroundJob` to run when the app becomes active
|
||||
enum _016_AddDeadlockWorkAroundJob: Migration {
|
||||
static let target: TargetMigrations.Identifier = .messagingKit
|
||||
static let identifier: String = "AddDeadlockWorkAroundJob"
|
||||
static let needsConfigSync: Bool = false
|
||||
static let minExpectedRunDuration: TimeInterval = 0.1
|
||||
|
||||
static func migrate(_ db: Database) throws {
|
||||
_ = try Job(
|
||||
variant: .processDeadlockWorkAround,
|
||||
behaviour: .recurringOnActive,
|
||||
shouldBlock: true
|
||||
).migrationSafeInserted(db)
|
||||
|
||||
Storage.update(progress: 1, for: self, in: target) // In case this is the last migration
|
||||
}
|
||||
}
|
|
@ -77,7 +77,13 @@ public struct LinkPreview: Codable, Equatable, Hashable, FetchableRecord, Persis
|
|||
// MARK: - Protobuf
|
||||
|
||||
public extension LinkPreview {
|
||||
init?(_ db: Database, proto: SNProtoDataMessage, body: String?, sentTimestampMs: TimeInterval) throws {
|
||||
init?(
|
||||
_ db: Database,
|
||||
proto: SNProtoDataMessage,
|
||||
body: String?,
|
||||
sentTimestampMs: TimeInterval,
|
||||
preparedAttachments: [String: Attachment]?
|
||||
) throws {
|
||||
guard let previewProto = proto.preview.first else { throw LinkPreviewError.noPreview }
|
||||
guard URL(string: previewProto.url) != nil else { throw LinkPreviewError.invalidInput }
|
||||
guard LinkPreview.isValidLinkUrl(previewProto.url) else { throw LinkPreviewError.invalidInput }
|
||||
|
@ -102,8 +108,8 @@ public extension LinkPreview {
|
|||
self.title = LinkPreview.normalizeTitle(title: previewProto.title)
|
||||
|
||||
if let imageProto = previewProto.image {
|
||||
let attachment: Attachment = Attachment(proto: imageProto)
|
||||
try attachment.insert(db)
|
||||
let attachment: Attachment = (preparedAttachments?["\(imageProto.id)"] ?? Attachment(proto: imageProto))
|
||||
try attachment.save(db)
|
||||
|
||||
self.attachmentId = attachment.id
|
||||
}
|
||||
|
|
|
@ -99,7 +99,13 @@ public extension Quote {
|
|||
// MARK: - Protobuf
|
||||
|
||||
public extension Quote {
|
||||
init?(_ db: Database, proto: SNProtoDataMessage, interactionId: Int64, thread: SessionThread) throws {
|
||||
init?(
|
||||
_ db: Database,
|
||||
proto: SNProtoDataMessage,
|
||||
interactionId: Int64,
|
||||
thread: SessionThread,
|
||||
preparedAttachments: [String: Attachment]?
|
||||
) throws {
|
||||
guard
|
||||
let quoteProto = proto.quote,
|
||||
quoteProto.id != 0,
|
||||
|
@ -111,5 +117,12 @@ public extension Quote {
|
|||
self.authorId = quoteProto.author
|
||||
self.body = nil
|
||||
self.attachmentId = nil
|
||||
|
||||
// It shouldn't be possible to have a prepared attachment for a quote (as sending a quote
|
||||
// from an extension isn't possible) but just in case we will save any which exist
|
||||
try quoteProto.attachments
|
||||
.compactMap { $0.thumbnail }
|
||||
.compactMap { preparedAttachments?["\($0.id)"] }
|
||||
.forEach { try $0.save(db) }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,8 @@ public enum MessageReceiveJob: JobExecutor {
|
|||
threadVariant: messageInfo.threadVariant,
|
||||
message: messageInfo.message,
|
||||
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
|
||||
associatedWithProto: protoContent
|
||||
associatedWithProto: protoContent,
|
||||
canShowNotification: true
|
||||
)
|
||||
}
|
||||
catch {
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
|
||||
|
||||
import Foundation
|
||||
import GRDB
|
||||
import SessionUtilitiesKit
|
||||
|
||||
public enum ProcessDeadlockWorkAroundJob: JobExecutor {
|
||||
public static let maxFailureCount: Int = -1
|
||||
public static let requiresThreadId: Bool = false
|
||||
public static let requiresInteractionId: Bool = false
|
||||
|
||||
public static func run(
|
||||
_ job: Job,
|
||||
queue: DispatchQueue,
|
||||
success: @escaping (Job, Bool, Dependencies) -> (),
|
||||
failure: @escaping (Job, Error?, Bool, Dependencies) -> (),
|
||||
deferred: @escaping (Job, Dependencies) -> (),
|
||||
using dependencies: Dependencies
|
||||
) {
|
||||
// Don't run when inactive or not in main app
|
||||
guard (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) else {
|
||||
deferred(job, dependencies) // Don't need to do anything if it's not the main app
|
||||
return
|
||||
}
|
||||
|
||||
// Process any DeadlockWorkAround messages
|
||||
do {
|
||||
try DeadlockWorkAround.readProcessAndRemoveRecords()
|
||||
success(job, false, dependencies)
|
||||
}
|
||||
catch {
|
||||
SNLog("[DeadlockWorkAround] Failed due to error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
public static func afterAppShare(
|
||||
_ shareViewController: UIActivityViewController,
|
||||
onShareComplete: ((Bool) -> ())? = nil,
|
||||
using dependencies: Dependencies = Dependencies()
|
||||
) -> UIActivityViewController.CompletionWithItemsHandler {
|
||||
return { [weak shareViewController] _, completed, _, _ in
|
||||
shareViewController?.completionWithItemsHandler = nil
|
||||
|
||||
guard completed else { return }
|
||||
|
||||
// The share extension runs in read only mode and leaves an artifact for the shared content,
|
||||
// now that it's completed we need to
|
||||
ProcessDeadlockWorkAroundJob.run(
|
||||
Job(
|
||||
variant: .processDeadlockWorkAround,
|
||||
behaviour: .runOnce
|
||||
),
|
||||
queue: DispatchQueue.global(qos: .default),
|
||||
success: { _, _, _ in onShareComplete?(completed) },
|
||||
failure: { _, _, _, _ in onShareComplete?(completed) },
|
||||
deferred: { _, _ in onShareComplete?(completed) },
|
||||
using: dependencies
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -249,6 +249,7 @@ public extension Message {
|
|||
do {
|
||||
let processedMessage: ProcessedMessage? = try processRawReceivedMessage(
|
||||
db,
|
||||
readOnly: false,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: (TimeInterval(rawMessage.info.expirationDateMs) / 1000),
|
||||
serverHash: rawMessage.info.hash,
|
||||
|
@ -305,6 +306,7 @@ public extension Message {
|
|||
) throws -> ProcessedMessage? {
|
||||
return try processRawReceivedMessage(
|
||||
db,
|
||||
readOnly: false,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: (
|
||||
(TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000) +
|
||||
|
@ -321,11 +323,13 @@ public extension Message {
|
|||
/// closed group key update messages (the `NotificationServiceExtension` does this itself)
|
||||
static func processRawReceivedMessageAsNotification(
|
||||
_ db: Database,
|
||||
readOnly: Bool,
|
||||
envelope: SNProtoEnvelope,
|
||||
using dependencies: Dependencies = Dependencies()
|
||||
) throws -> ProcessedMessage? {
|
||||
let processedMessage: ProcessedMessage? = try processRawReceivedMessage(
|
||||
db,
|
||||
readOnly: readOnly,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: (
|
||||
(TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000) +
|
||||
|
@ -361,6 +365,7 @@ public extension Message {
|
|||
|
||||
return try processRawReceivedMessage(
|
||||
db,
|
||||
readOnly: false,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: nil,
|
||||
serverHash: nil,
|
||||
|
@ -392,6 +397,7 @@ public extension Message {
|
|||
|
||||
return try processRawReceivedMessage(
|
||||
db,
|
||||
readOnly: false,
|
||||
envelope: envelope,
|
||||
serverExpirationTimestamp: nil,
|
||||
serverHash: nil,
|
||||
|
@ -540,6 +546,7 @@ public extension Message {
|
|||
|
||||
private static func processRawReceivedMessage(
|
||||
_ db: Database,
|
||||
readOnly: Bool,
|
||||
envelope: SNProtoEnvelope,
|
||||
serverExpirationTimestamp: TimeInterval?,
|
||||
serverHash: String?,
|
||||
|
@ -594,20 +601,22 @@ public extension Message {
|
|||
}
|
||||
|
||||
// Prevent ControlMessages from being handled multiple times if not supported
|
||||
do {
|
||||
try ControlMessageProcessRecord(
|
||||
threadId: threadId,
|
||||
message: message,
|
||||
serverExpirationTimestamp: serverExpirationTimestamp
|
||||
)?.insert(db)
|
||||
}
|
||||
catch {
|
||||
// We want to custom handle this
|
||||
if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error {
|
||||
throw MessageReceiverError.duplicateControlMessage
|
||||
if !readOnly {
|
||||
do {
|
||||
try ControlMessageProcessRecord(
|
||||
threadId: threadId,
|
||||
message: message,
|
||||
serverExpirationTimestamp: serverExpirationTimestamp
|
||||
)?.insert(db)
|
||||
}
|
||||
catch {
|
||||
// We want to custom handle this
|
||||
if case DatabaseError.SQLITE_CONSTRAINT_UNIQUE = error {
|
||||
throw MessageReceiverError.duplicateControlMessage
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
return (
|
||||
|
|
|
@ -611,6 +611,7 @@ public final class OpenGroupManager {
|
|||
message: messageInfo.message,
|
||||
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
|
||||
associatedWithProto: proto,
|
||||
canShowNotification: true,
|
||||
using: dependencies
|
||||
)
|
||||
largestValidSeqNo = max(largestValidSeqNo, message.seqNo)
|
||||
|
@ -694,6 +695,7 @@ public final class OpenGroupManager {
|
|||
public static func handleDirectMessages(
|
||||
_ db: Database,
|
||||
messages: [OpenGroupAPI.DirectMessage],
|
||||
ignoreMessageId: Bool,
|
||||
fromOutbox: Bool,
|
||||
on server: String,
|
||||
using dependencies: Dependencies
|
||||
|
@ -712,16 +714,18 @@ public final class OpenGroupManager {
|
|||
let latestMessageId: Int64 = sortedMessages[sortedMessages.count - 1].id
|
||||
var lookupCache: [String: BlindedIdLookup] = [:] // Only want this cache to exist for the current loop
|
||||
|
||||
// Update the 'latestMessageId' value
|
||||
if fromOutbox {
|
||||
_ = try? OpenGroup
|
||||
.filter(OpenGroup.Columns.server == server.lowercased())
|
||||
.updateAll(db, OpenGroup.Columns.outboxLatestMessageId.set(to: latestMessageId))
|
||||
}
|
||||
else {
|
||||
_ = try? OpenGroup
|
||||
.filter(OpenGroup.Columns.server == server.lowercased())
|
||||
.updateAll(db, OpenGroup.Columns.inboxLatestMessageId.set(to: latestMessageId))
|
||||
// Update the 'latestMessageId' value if we aren't ignoring it
|
||||
if !ignoreMessageId {
|
||||
if fromOutbox {
|
||||
_ = try? OpenGroup
|
||||
.filter(OpenGroup.Columns.server == server.lowercased())
|
||||
.updateAll(db, OpenGroup.Columns.outboxLatestMessageId.set(to: latestMessageId))
|
||||
}
|
||||
else {
|
||||
_ = try? OpenGroup
|
||||
.filter(OpenGroup.Columns.server == server.lowercased())
|
||||
.updateAll(db, OpenGroup.Columns.inboxLatestMessageId.set(to: latestMessageId))
|
||||
}
|
||||
}
|
||||
|
||||
// Process the messages
|
||||
|
@ -794,6 +798,7 @@ public final class OpenGroupManager {
|
|||
message: messageInfo.message,
|
||||
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
|
||||
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
|
||||
canShowNotification: true,
|
||||
using: dependencies
|
||||
)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ extension MessageReceiver {
|
|||
message: VisibleMessage,
|
||||
preparedAttachments: [String: Attachment]?,
|
||||
associatedWithProto proto: SNProtoContent,
|
||||
canShowNotification: Bool,
|
||||
using dependencies: Dependencies = Dependencies()
|
||||
) throws -> Int64 {
|
||||
guard let sender: String = message.sender, let dataMessage = proto.dataMessage else {
|
||||
|
@ -199,7 +200,8 @@ extension MessageReceiver {
|
|||
db,
|
||||
proto: dataMessage,
|
||||
interactionId: interactionId,
|
||||
thread: thread
|
||||
thread: thread,
|
||||
preparedAttachments: preparedAttachments
|
||||
)?.inserted(db)
|
||||
|
||||
// Parse link preview if needed
|
||||
|
@ -207,7 +209,8 @@ extension MessageReceiver {
|
|||
db,
|
||||
proto: dataMessage,
|
||||
body: message.text,
|
||||
sentTimestampMs: (messageSentTimestamp * 1000)
|
||||
sentTimestampMs: (messageSentTimestamp * 1000),
|
||||
preparedAttachments: preparedAttachments
|
||||
)?.saved(db)
|
||||
|
||||
// Open group invitations are stored as LinkPreview values so create one if needed
|
||||
|
@ -269,7 +272,11 @@ extension MessageReceiver {
|
|||
}
|
||||
|
||||
// Notify the user if needed
|
||||
guard interactionVariant == .standardIncoming && !interaction.wasRead else { return interactionId }
|
||||
guard
|
||||
canShowNotification &&
|
||||
interactionVariant == .standardIncoming &&
|
||||
!interaction.wasRead
|
||||
else { return interactionId }
|
||||
|
||||
// Use the same identifier for notifications when in backgroud polling to prevent spam
|
||||
Environment.shared?.notificationsManager.wrappedValue?
|
||||
|
|
|
@ -190,6 +190,7 @@ public enum MessageReceiver {
|
|||
preparedAttachments: [String: Attachment]? = nil,
|
||||
serverExpirationTimestamp: TimeInterval?,
|
||||
associatedWithProto proto: SNProtoContent,
|
||||
canShowNotification: Bool,
|
||||
using dependencies: Dependencies = Dependencies()
|
||||
) throws {
|
||||
// Check if the message requires an existing conversation (if it does and the conversation isn't in
|
||||
|
@ -279,7 +280,8 @@ public enum MessageReceiver {
|
|||
threadVariant: threadVariant,
|
||||
message: message,
|
||||
preparedAttachments: preparedAttachments,
|
||||
associatedWithProto: proto
|
||||
associatedWithProto: proto,
|
||||
canShowNotification: canShowNotification
|
||||
)
|
||||
|
||||
// SharedConfigMessages should be handled by the 'SharedUtil' instead of this
|
||||
|
|
|
@ -9,19 +9,19 @@ extension MessageSender {
|
|||
|
||||
// MARK: - Durable
|
||||
|
||||
public static func send(
|
||||
@discardableResult public static func send(
|
||||
_ db: Database,
|
||||
interaction: Interaction,
|
||||
threadId: String,
|
||||
threadVariant: SessionThread.Variant,
|
||||
isSyncMessage: Bool = false,
|
||||
using dependencies: Dependencies
|
||||
) throws {
|
||||
) throws -> Job? {
|
||||
// Only 'VisibleMessage' types can be sent via this method
|
||||
guard interaction.variant == .standardOutgoing else { throw MessageSenderError.invalidMessage }
|
||||
guard let interactionId: Int64 = interaction.id else { throw StorageError.objectNotSaved }
|
||||
|
||||
send(
|
||||
return send(
|
||||
db,
|
||||
message: VisibleMessage.from(db, interaction: interaction),
|
||||
threadId: threadId,
|
||||
|
@ -52,7 +52,7 @@ extension MessageSender {
|
|||
)
|
||||
}
|
||||
|
||||
public static func send(
|
||||
@discardableResult public static func send(
|
||||
_ db: Database,
|
||||
message: Message,
|
||||
threadId: String?,
|
||||
|
@ -60,11 +60,11 @@ extension MessageSender {
|
|||
to destination: Message.Destination,
|
||||
isSyncMessage: Bool = false,
|
||||
using dependencies: Dependencies
|
||||
) {
|
||||
) -> Job? {
|
||||
// If it's a sync message then we need to make some slight tweaks before sending so use the proper
|
||||
// sync message sending process instead of the standard process
|
||||
guard !isSyncMessage else {
|
||||
scheduleSyncMessageIfNeeded(
|
||||
return scheduleSyncMessageIfNeeded(
|
||||
db,
|
||||
message: message,
|
||||
destination: destination,
|
||||
|
@ -73,10 +73,9 @@ extension MessageSender {
|
|||
isAlreadySyncMessage: false,
|
||||
using: dependencies
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
dependencies.jobRunner.add(
|
||||
return dependencies.jobRunner.add(
|
||||
db,
|
||||
job: Job(
|
||||
variant: .messageSend,
|
||||
|
|
|
@ -1149,7 +1149,7 @@ public final class MessageSender {
|
|||
return nil
|
||||
}
|
||||
|
||||
public static func scheduleSyncMessageIfNeeded(
|
||||
@discardableResult public static func scheduleSyncMessageIfNeeded(
|
||||
_ db: Database,
|
||||
message: Message,
|
||||
destination: Message.Destination,
|
||||
|
@ -1157,7 +1157,7 @@ public final class MessageSender {
|
|||
interactionId: Int64?,
|
||||
isAlreadySyncMessage: Bool,
|
||||
using dependencies: Dependencies
|
||||
) {
|
||||
) -> Job? {
|
||||
// Sync the message if it's not a sync message, wasn't already sent to the current user and
|
||||
// it's a message type which should be synced
|
||||
let currentUserPublicKey = getUserHexEncodedPublicKey(db, using: dependencies)
|
||||
|
@ -1171,7 +1171,7 @@ public final class MessageSender {
|
|||
if let message = message as? VisibleMessage { message.syncTarget = publicKey }
|
||||
if let message = message as? ExpirationTimerUpdate { message.syncTarget = publicKey }
|
||||
|
||||
dependencies.jobRunner.add(
|
||||
return dependencies.jobRunner.add(
|
||||
db,
|
||||
job: Job(
|
||||
variant: .messageSend,
|
||||
|
@ -1187,5 +1187,7 @@ public final class MessageSender {
|
|||
using: dependencies
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -583,6 +583,7 @@ extension OpenGroupAPI {
|
|||
OpenGroupManager.handleDirectMessages(
|
||||
db,
|
||||
messages: messages,
|
||||
ignoreMessageId: false,
|
||||
fromOutbox: fromOutbox,
|
||||
on: server,
|
||||
using: dependencies
|
||||
|
|
|
@ -202,34 +202,40 @@ public enum DeadlockWorkAround {
|
|||
}
|
||||
|
||||
// Process the messages which were successful
|
||||
var completedFilenames: [String] = []
|
||||
|
||||
dependencies.storage.write { db in
|
||||
try deadlockMessages.forEach { deadlockMessage, _ in
|
||||
switch deadlockMessage.variant {
|
||||
case .incomingMessage(let envelopeData):
|
||||
try processIncomingMessage(db, envelopeData: envelopeData, using: dependencies)
|
||||
|
||||
case .incomingCall(let threadId, let threadVariant, let sentTimestamp, let state):
|
||||
try processIncomingCallMessage(
|
||||
db,
|
||||
threadId: threadId,
|
||||
threadVariant: threadVariant,
|
||||
sentTimestamp: sentTimestamp,
|
||||
state: state,
|
||||
using: dependencies
|
||||
)
|
||||
|
||||
case .outgoingMessage, .outgoingOpenGroupMessage, .outgoingOpenGroupInboxMessage:
|
||||
try processOutgoingMessage(db, message: deadlockMessage, using: dependencies)
|
||||
|
||||
case .configSync(let publicKey):
|
||||
processConfigSyncMessage(db, publicKey: publicKey, using: dependencies)
|
||||
completedFilenames = deadlockMessages.compactMap { deadlockMessage, filename in
|
||||
do {
|
||||
switch deadlockMessage.variant {
|
||||
case .incomingMessage(let envelopeData):
|
||||
try processIncomingMessage(db, envelopeData: envelopeData, using: dependencies)
|
||||
|
||||
case .incomingCall(let threadId, let threadVariant, let sentTimestamp, let state):
|
||||
try processIncomingCallMessage(
|
||||
db,
|
||||
threadId: threadId,
|
||||
threadVariant: threadVariant,
|
||||
sentTimestamp: sentTimestamp,
|
||||
state: state,
|
||||
using: dependencies
|
||||
)
|
||||
|
||||
case .outgoingMessage, .outgoingOpenGroupMessage, .outgoingOpenGroupInboxMessage:
|
||||
try processOutgoingMessage(db, message: deadlockMessage, using: dependencies)
|
||||
|
||||
case .configSync(let publicKey):
|
||||
processConfigSyncMessage(db, publicKey: publicKey, using: dependencies)
|
||||
}
|
||||
|
||||
return filename
|
||||
}
|
||||
catch { return nil }
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the files which were parsed successfully - only want to process them once
|
||||
// even if they failed
|
||||
let completedFilenames: [String] = deadlockMessages.map { $0.filename }
|
||||
completedFilenames.forEach { filename in
|
||||
try? FileManager.default.removeItem(atPath: "\(DeadlockWorkAround.sharedDeadlockDirectoryPath)/\(filename)")
|
||||
}
|
||||
|
@ -258,17 +264,20 @@ public enum DeadlockWorkAround {
|
|||
)
|
||||
}
|
||||
let numToRemove: Int = filesToUpdate.filter { $0.shouldDelete }.count
|
||||
SNLog("[DeadlockWorkAround] Completed processing \(deadlockMessages.count) message\(deadlockMessages.count == 1 ? "" : "s") (ignoring \(numToRemove) message\(numToRemove == 1 ? "" : "s"))")
|
||||
SNLog("[DeadlockWorkAround] Completed processing \(deadlockMessages.count) message\(deadlockMessages.count == 1 ? "" : "s") (\(filesToUpdate.count) failed, ignoring \(numToRemove) message\(numToRemove == 1 ? "" : "s"))")
|
||||
|
||||
// Remove/Rename remaining files
|
||||
filesToUpdate
|
||||
.forEach { old, new, shouldRemove in
|
||||
guard !shouldRemove else {
|
||||
try? FileManager.default.removeItem(atPath: old)
|
||||
try? FileManager.default.removeItem(atPath: "\(DeadlockWorkAround.sharedDeadlockDirectoryPath)/\(old)")
|
||||
return
|
||||
}
|
||||
|
||||
try? FileManager.default.moveItem(atPath: old, toPath: new)
|
||||
try? FileManager.default.moveItem(
|
||||
atPath: "\(DeadlockWorkAround.sharedDeadlockDirectoryPath)/\(old)",
|
||||
toPath: "\(DeadlockWorkAround.sharedDeadlockDirectoryPath)/\(new)"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,7 +288,7 @@ public enum DeadlockWorkAround {
|
|||
) throws {
|
||||
guard
|
||||
let envelope: SNProtoEnvelope = try? SNProtoEnvelope.parseData(envelopeData),
|
||||
let processedMessage: ProcessedMessage = try Message.processRawReceivedMessageAsNotification(db, envelope: envelope)
|
||||
let processedMessage: ProcessedMessage = try Message.processRawReceivedMessageAsNotification(db, readOnly: false, envelope: envelope)
|
||||
else { return }
|
||||
|
||||
try MessageReceiver.handle(
|
||||
|
@ -288,7 +297,8 @@ public enum DeadlockWorkAround {
|
|||
threadVariant: processedMessage.threadVariant,
|
||||
message: processedMessage.messageInfo.message,
|
||||
serverExpirationTimestamp: processedMessage.messageInfo.serverExpirationTimestamp,
|
||||
associatedWithProto: processedMessage.proto
|
||||
associatedWithProto: processedMessage.proto,
|
||||
canShowNotification: false
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -338,7 +348,8 @@ public enum DeadlockWorkAround {
|
|||
preparedAttachments: message.attachments?
|
||||
.reduce(into: [:]) { result, next in result[next.serverId ?? ""] = next },
|
||||
serverExpirationTimestamp: processedMessage.messageInfo.serverExpirationTimestamp,
|
||||
associatedWithProto: processedMessage.proto
|
||||
associatedWithProto: processedMessage.proto,
|
||||
canShowNotification: false
|
||||
)
|
||||
|
||||
case .outgoingOpenGroupMessage(
|
||||
|
@ -381,6 +392,7 @@ public enum DeadlockWorkAround {
|
|||
.reduce(into: [:]) { result, next in result[next.serverId ?? ""] = next },
|
||||
serverExpirationTimestamp: processedMessage.messageInfo.serverExpirationTimestamp,
|
||||
associatedWithProto: processedMessage.proto,
|
||||
canShowNotification: false,
|
||||
using: dependencies
|
||||
)
|
||||
|
||||
|
@ -412,6 +424,7 @@ public enum DeadlockWorkAround {
|
|||
base64EncodedMessage: base64EncodedMessage
|
||||
)
|
||||
],
|
||||
ignoreMessageId: true,
|
||||
fromOutbox: true,
|
||||
on: server,
|
||||
using: dependencies
|
||||
|
|
|
@ -45,8 +45,11 @@ public class NSENotificationPresenter: NSObject, NotificationsProtocol {
|
|||
.replacingMentions(for: thread.id))
|
||||
.defaulting(to: "APN_Message".localized())
|
||||
|
||||
var userInfo: [String: Any] = [ NotificationServiceExtension.isFromRemoteKey: true ]
|
||||
userInfo[NotificationServiceExtension.threadIdKey] = thread.id
|
||||
var userInfo: [String: Any] = [
|
||||
NotificationServiceExtension.isFromRemoteKey: true,
|
||||
NotificationServiceExtension.threadIdKey: thread.id,
|
||||
NotificationServiceExtension.threadVariantRaw: thread.variant.rawValue
|
||||
]
|
||||
|
||||
let notificationContent = UNMutableNotificationContent()
|
||||
notificationContent.userInfo = userInfo
|
||||
|
@ -145,8 +148,11 @@ public class NSENotificationPresenter: NSObject, NotificationsProtocol {
|
|||
// Only notify missed calls
|
||||
guard messageInfo.state == .missed || messageInfo.state == .permissionDenied else { return }
|
||||
|
||||
var userInfo: [String: Any] = [ NotificationServiceExtension.isFromRemoteKey: true ]
|
||||
userInfo[NotificationServiceExtension.threadIdKey] = thread.id
|
||||
var userInfo: [String: Any] = [
|
||||
NotificationServiceExtension.isFromRemoteKey: true,
|
||||
NotificationServiceExtension.threadIdKey: thread.id,
|
||||
NotificationServiceExtension.threadVariantRaw: thread.variant.rawValue
|
||||
]
|
||||
|
||||
let notificationContent = UNMutableNotificationContent()
|
||||
notificationContent.userInfo = userInfo
|
||||
|
@ -206,8 +212,11 @@ public class NSENotificationPresenter: NSObject, NotificationsProtocol {
|
|||
default: notificationBody = NotificationStrings.incomingMessageBody
|
||||
}
|
||||
|
||||
var userInfo: [String: Any] = [ NotificationServiceExtension.isFromRemoteKey: true ]
|
||||
userInfo[NotificationServiceExtension.threadIdKey] = thread.id
|
||||
var userInfo: [String: Any] = [
|
||||
NotificationServiceExtension.isFromRemoteKey: true,
|
||||
NotificationServiceExtension.threadIdKey: thread.id,
|
||||
NotificationServiceExtension.threadVariantRaw: thread.variant.rawValue
|
||||
]
|
||||
|
||||
let notificationContent = UNMutableNotificationContent()
|
||||
notificationContent.userInfo = userInfo
|
||||
|
|
|
@ -18,6 +18,7 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension
|
|||
|
||||
public static let isFromRemoteKey = "remote"
|
||||
public static let threadIdKey = "Signal.AppNotificationsUserInfoKey.threadId"
|
||||
public static let threadVariantRaw = "Signal.AppNotificationsUserInfoKey.threadVariantRaw"
|
||||
public static let threadNotificationCounter = "Session.AppNotificationsUserInfoKey.threadNotificationCounter"
|
||||
|
||||
// MARK: Did receive a remote push notification request
|
||||
|
@ -85,7 +86,7 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension
|
|||
// is added to notification center
|
||||
Storage.shared.read { db in
|
||||
do {
|
||||
guard let processedMessage: ProcessedMessage = try Message.processRawReceivedMessageAsNotification(db, envelope: envelope) else {
|
||||
guard let processedMessage: ProcessedMessage = try Message.processRawReceivedMessageAsNotification(db, readOnly: true, envelope: envelope) else {
|
||||
self.handleFailure(for: notificationContent)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -262,6 +262,17 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView
|
|||
/// Disappearing Messages, as a result we need to explicitly `getNetworkTime` in order to ensure it's accurate
|
||||
Just(())
|
||||
.setFailureType(to: Error.self)
|
||||
.flatMap {
|
||||
guard !SnodeAPI.hasCachedSnodesIncludingExpired() else {
|
||||
return Just(())
|
||||
.setFailureType(to: Error.self)
|
||||
.eraseToAnyPublisher()
|
||||
}
|
||||
|
||||
return SnodeAPI.getSnodePool()
|
||||
.map { _ in () }
|
||||
.eraseToAnyPublisher()
|
||||
}
|
||||
.flatMap { _ -> AnyPublisher<[Attachment.PreparedUpload], Error> in
|
||||
guard !finalAttachments.isEmpty || linkPreviewInfo != nil else {
|
||||
return SnodeAPI
|
||||
|
|
|
@ -23,7 +23,7 @@ public enum GetSnodePoolJob: JobExecutor {
|
|||
// but we want to succeed this job immediately (since it's marked as blocking), this allows us
|
||||
// to block if we have no Snode pool and prevent other jobs from failing but avoids having to
|
||||
// wait if we already have a potentially valid snode pool
|
||||
guard !SnodeAPI.hasCachedSnodesInclusingExpired() else {
|
||||
guard !SnodeAPI.hasCachedSnodesIncludingExpired() else {
|
||||
SNLog("[GetSnodePoolJob] Has valid cached pool, running async instead")
|
||||
SnodeAPI
|
||||
.getSnodePool()
|
||||
|
|
|
@ -141,7 +141,7 @@ public final class SnodeAPI {
|
|||
|
||||
// MARK: - Public API
|
||||
|
||||
public static func hasCachedSnodesInclusingExpired() -> Bool {
|
||||
public static func hasCachedSnodesIncludingExpired() -> Bool {
|
||||
loadSnodePoolIfNeeded()
|
||||
|
||||
return !hasInsufficientSnodes
|
||||
|
|
|
@ -88,6 +88,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord,
|
|||
/// (if read receipts are enabled) to notify other members in a conversation that their message was read
|
||||
case sendReadReceipts
|
||||
|
||||
/// This is a job that runs when returning from the background in or to process any messages sent/received
|
||||
/// by the app extensions (since they run in a read-only mode)
|
||||
case processDeadlockWorkAround
|
||||
|
||||
/// This is a job that runs once whenever a message is received to attempt to decode and properly
|
||||
/// process the message
|
||||
case messageReceive = 3000
|
||||
|
|
|
@ -270,6 +270,9 @@ public final class JobRunner: JobRunnerType {
|
|||
self.blockingQueue.mutate {
|
||||
$0?.canStart = { [weak self] queue -> Bool in (self?.canStart(queue: queue) == true) }
|
||||
$0?.onQueueDrained = { [weak self] in
|
||||
// Only consider the blocking queue drained once the app has become active
|
||||
guard self?.appHasBecomeActive.wrappedValue == true else { return }
|
||||
|
||||
// Once all blocking jobs have been completed we want to start running
|
||||
// the remaining job queues
|
||||
self?.startNonBlockingQueues(using: dependencies)
|
||||
|
@ -463,37 +466,43 @@ public final class JobRunner: JobRunnerType {
|
|||
|
||||
// Retrieve any jobs which should run when becoming active
|
||||
let hasCompletedInitialBecomeActive: Bool = self.hasCompletedInitialBecomeActive.wrappedValue
|
||||
let jobsToRun: [Job] = dependencies.storage
|
||||
let jobsToRun: (blocking: [Job], nonBlocking: [Job]) = dependencies.storage
|
||||
.read { db in
|
||||
return try Job
|
||||
let blockingJobs: [Job] = try Job
|
||||
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
|
||||
.filter(Job.Columns.shouldBlock == true)
|
||||
.order(
|
||||
Job.Columns.priority.desc,
|
||||
Job.Columns.id
|
||||
)
|
||||
.fetchAll(db)
|
||||
.filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive }
|
||||
let nonblockingJobs: [Job] = try Job
|
||||
.filter(Job.Columns.behaviour == Job.Behaviour.recurringOnActive)
|
||||
.filter(Job.Columns.shouldBlock == false)
|
||||
.order(
|
||||
Job.Columns.priority.desc,
|
||||
Job.Columns.id
|
||||
)
|
||||
.fetchAll(db)
|
||||
.filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive }
|
||||
|
||||
return (blockingJobs, nonblockingJobs)
|
||||
}
|
||||
.defaulting(to: [])
|
||||
.filter { hasCompletedInitialBecomeActive || !$0.shouldSkipLaunchBecomeActive }
|
||||
.defaulting(to: ([], []))
|
||||
|
||||
// Store the current queue state locally to avoid multiple atomic retrievals
|
||||
// Add and start any blocking jobs
|
||||
blockingQueue.wrappedValue?.appDidBecomeActive(
|
||||
with: jobsToRun.blocking,
|
||||
canStart: true,
|
||||
using: dependencies
|
||||
)
|
||||
|
||||
// Add any non-blocking jobs (we don't start these incase there are blocking "on active"
|
||||
// jobs as well)
|
||||
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.nonBlocking.grouped(by: \.variant)
|
||||
let jobQueues: [Job.Variant: JobQueue] = queues.wrappedValue
|
||||
let blockingQueueIsRunning: Bool = (blockingQueue.wrappedValue?.isRunning.wrappedValue == true)
|
||||
|
||||
guard !jobsToRun.isEmpty else {
|
||||
if !blockingQueueIsRunning {
|
||||
jobQueues.map { _, queue in queue }.asSet().forEach { $0.start(using: dependencies) }
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Add and start any non-blocking jobs (if there are no blocking jobs)
|
||||
//
|
||||
// We only want to trigger the queue to start once so we need to consolidate the
|
||||
// queues to list of jobs (as queues can handle multiple job variants), this means
|
||||
// that 'onActive' jobs will be queued before any standard jobs
|
||||
let jobsByVariant: [Job.Variant: [Job]] = jobsToRun.grouped(by: \.variant)
|
||||
|
||||
|
||||
jobQueues
|
||||
.reduce(into: [:]) { result, variantAndQueue in
|
||||
result[variantAndQueue.value] = (result[variantAndQueue.value] ?? [])
|
||||
|
@ -502,11 +511,20 @@ public final class JobRunner: JobRunnerType {
|
|||
.forEach { queue, jobs in
|
||||
queue.appDidBecomeActive(
|
||||
with: jobs,
|
||||
canStart: !blockingQueueIsRunning,
|
||||
canStart: false,
|
||||
using: dependencies
|
||||
)
|
||||
}
|
||||
|
||||
// If the blocking queue is not running and has no pending (should already be running if it has jobs), otherwise
|
||||
// we should trigger the blockingQueue 'onQueueDrained' logic which will start the other queues
|
||||
// and call any registered callbacks
|
||||
|
||||
// Just in case the logic changes in the future, start the blocking queue if needed which will result
|
||||
// in the blockingQueue 'onQueueDrained' logic being triggered if it's actually empty (starting the
|
||||
// other queues and calling any registered callbacks)
|
||||
blockingQueue.wrappedValue?.start(using: dependencies)
|
||||
|
||||
self.hasCompletedInitialBecomeActive.mutate { $0 = true }
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue