diff --git a/Podfile.lock b/Podfile.lock index 41b826741..5e750d933 100644 --- a/Podfile.lock +++ b/Podfile.lock @@ -27,7 +27,7 @@ PODS: - DifferenceKit/Core (1.2.0) - DifferenceKit/UIKitExtension (1.2.0): - DifferenceKit/Core - - GRDB.swift/SQLCipher (5.24.1): + - GRDB.swift/SQLCipher (5.26.0): - SQLCipher (>= 3.4.0) - libwebp (1.2.1): - libwebp/demux (= 1.2.1) @@ -222,7 +222,7 @@ SPEC CHECKSUMS: CryptoSwift: a532e74ed010f8c95f611d00b8bbae42e9fe7c17 Curve25519Kit: e63f9859ede02438ae3defc5e1a87e09d1ec7ee6 DifferenceKit: 5659c430bb7fe45876fa32ce5cba5d6167f0c805 - GRDB.swift: b3180ce2135fc06a453297889b746b1478c4d8c7 + GRDB.swift: 1395cb3556df6b16ed69dfc74c3886abc75d2825 libwebp: 98a37e597e40bfdb4c911fc98f2c53d0b12d05fc Nimble: 5316ef81a170ce87baf72dd961f22f89a602ff84 NVActivityIndicatorView: 1f6c5687f1171810aa27a3296814dc2d7dec3667 diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index 6c20b39d0..c7ca1ac9c 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -6818,7 +6818,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 363; + CURRENT_PROJECT_VERSION = 365; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", @@ -6890,7 +6890,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 363; + CURRENT_PROJECT_VERSION = 365; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 0d272ee29..27209404c 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -520,16 +520,18 @@ extension ConversationVC: let threadId: String = self.viewModel.threadData.threadId let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant let threadIsMessageRequest: Bool = (self.viewModel.threadData.threadIsMessageRequest == true) + let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart( + threadId: threadId, + threadVariant: threadVariant, + threadIsMessageRequest: threadIsMessageRequest, + direction: .outgoing, + timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000)) + ) - Storage.shared.writeAsync { db in - TypingIndicators.didStartTyping( - db, - threadId: threadId, - threadVariant: threadVariant, - threadIsMessageRequest: threadIsMessageRequest, - direction: .outgoing, - timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000)) - ) + if needsToStartTypingIndicator { + Storage.shared.writeAsync { db in + TypingIndicators.start(db, threadId: threadId, direction: .outgoing) + } } } diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index 9261160a9..f8c5bc1e2 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -428,15 +428,34 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { // MARK: - Functions public func updateDraft(to draft: String) { + let threadId: String = self.threadId + let currentDraft: String = Storage.shared + .read { db in + try SessionThread + .select(.messageDraft) + .filter(id: threadId) + .asRequest(of: String.self) + .fetchOne(db) + } + .defaulting(to: "") + + // Only write the updated draft to the database if it's changed (avoid unnecessary writes) + guard draft != currentDraft else { return } + Storage.shared.writeAsync { db in try SessionThread - .filter(id: self.threadId) + .filter(id: threadId) .updateAll(db, SessionThread.Columns.messageDraft.set(to: draft)) } } public func markAllAsRead() { - guard let lastInteractionId: Int64 = self.threadData.interactionId else { return } + // Don't bother marking anything as read if there are no unread interactions (we can rely + // on the 'threadData.threadUnreadCount' to always be accurate) + guard + (self.threadData.threadUnreadCount ?? 0) > 0, + let lastInteractionId: Int64 = self.threadData.interactionId + else { return } let threadId: String = self.threadData.threadId let trySendReadReceipt: Bool = (self.threadData.threadIsMessageRequest == false) diff --git a/Session/Conversations/Input View/InputViewButton.swift b/Session/Conversations/Input View/InputViewButton.swift index 26d2b495d..a9d2ca015 100644 --- a/Session/Conversations/Input View/InputViewButton.swift +++ b/Session/Conversations/Input View/InputViewButton.swift @@ -59,8 +59,8 @@ final class InputViewButton : UIView { isUserInteractionEnabled = true widthConstraint.isActive = true heightConstraint.isActive = true - let tint = isSendButton ? UIColor.black : Colors.text - let iconImageView = UIImageView(image: icon.withTint(tint)) + let iconImageView = UIImageView(image: icon.withRenderingMode(.alwaysTemplate)) + iconImageView.tintColor = (isSendButton ? UIColor.black : Colors.text) iconImageView.contentMode = .scaleAspectFit let iconSize = InputViewButton.iconSize iconImageView.set(.width, to: iconSize) diff --git a/Session/Conversations/Message Cells/Content Views/CallMessageView.swift b/Session/Conversations/Message Cells/Content Views/CallMessageView.swift index ffc527311..f27302f8f 100644 --- a/Session/Conversations/Message Cells/Content Views/CallMessageView.swift +++ b/Session/Conversations/Message Cells/Content Views/CallMessageView.swift @@ -28,8 +28,8 @@ final class CallMessageView: UIView { // Image view let imageView: UIImageView = UIImageView( image: UIImage(named: "Phone")? + .resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize))? .withRenderingMode(.alwaysTemplate) - .resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize)) ) imageView.tintColor = textColor imageView.contentMode = .center diff --git a/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift b/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift index 437689b35..22393d1a9 100644 --- a/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift +++ b/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift @@ -27,11 +27,11 @@ final class DeletedMessageView: UIView { private func setUpViewHierarchy(textColor: UIColor) { // Image view let icon = UIImage(named: "ic_trash")? - .withRenderingMode(.alwaysTemplate) .resizedImage(to: CGSize( width: DeletedMessageView.iconSize, height: DeletedMessageView.iconSize - )) + ))? + .withRenderingMode(.alwaysTemplate) let imageView = UIImageView(image: icon) imageView.tintColor = textColor diff --git a/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift b/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift index fbd65d20a..05ad35c01 100644 --- a/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift +++ b/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift @@ -44,13 +44,13 @@ final class MediaPlaceholderView: UIView { // Image view let imageView = UIImageView( image: UIImage(named: iconName)? - .withRenderingMode(.alwaysTemplate) .resizedImage( to: CGSize( width: MediaPlaceholderView.iconSize, height: MediaPlaceholderView.iconSize ) - ) + )? + .withRenderingMode(.alwaysTemplate) ) imageView.tintColor = textColor imageView.contentMode = .center diff --git a/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift b/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift index f95841449..432b34aff 100644 --- a/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift +++ b/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift @@ -68,8 +68,8 @@ final class OpenGroupInvitationView: UIView { let iconImageViewSize = OpenGroupInvitationView.iconImageViewSize let iconImageView = UIImageView( image: UIImage(named: iconName)? + .resizedImage(to: CGSize(width: iconSize, height: iconSize))? .withRenderingMode(.alwaysTemplate) - .resizedImage(to: CGSize(width: iconSize, height: iconSize)) ) iconImageView.tintColor = .white iconImageView.contentMode = .center diff --git a/Session/Meta/AppDelegate.swift b/Session/Meta/AppDelegate.swift index 41b106584..affde2523 100644 --- a/Session/Meta/AppDelegate.swift +++ b/Session/Meta/AppDelegate.swift @@ -122,6 +122,9 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD /// `appDidFinishLaunching` seems to fix this odd behaviour (even though it doesn't match /// Apple's documentation on the matter) UNUserNotificationCenter.current().delegate = self + + // Resume database + NotificationCenter.default.post(name: Database.resumeNotification, object: self) } func applicationDidEnterBackground(_ application: UIApplication) { @@ -130,6 +133,10 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD // NOTE: Fix an edge case where user taps on the callkit notification // but answers the call on another device stopPollers(shouldStopUserPoller: !self.hasIncomingCallWaiting()) + JobRunner.stopAndClearPendingJobs() + + // Suspend database + NotificationCenter.default.post(name: Database.suspendNotification, object: self) } func applicationDidReceiveMemoryWarning(_ application: UIApplication) { @@ -185,8 +192,16 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD // MARK: - Background Fetching func application(_ application: UIApplication, performFetchWithCompletionHandler completionHandler: @escaping (UIBackgroundFetchResult) -> Void) { + // Resume database + NotificationCenter.default.post(name: Database.resumeNotification, object: self) + AppReadiness.runNowOrWhenAppDidBecomeReady { - BackgroundPoller.poll(completionHandler: completionHandler) + BackgroundPoller.poll { result in + // Suspend database + NotificationCenter.default.post(name: Database.suspendNotification, object: self) + + completionHandler(result) + } } } diff --git a/Session/Shared/HighlightMentionBackgroundView.swift b/Session/Shared/HighlightMentionBackgroundView.swift index 1b9a593ba..3a7db4eaa 100644 --- a/Session/Shared/HighlightMentionBackgroundView.swift +++ b/Session/Shared/HighlightMentionBackgroundView.swift @@ -137,9 +137,6 @@ class HighlightMentionBackgroundView: UIView { extraYOffset ) - // We don't want to draw too far to the right - runBounds.size.width = (runBounds.width > lineWidth ? lineWidth : runBounds.width) - let path = UIBezierPath(roundedRect: runBounds, cornerRadius: cornerRadius) mentionBackgroundColor.setFill() path.fill() diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index 9a430ec5e..03f575c48 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -9,8 +9,11 @@ import SessionUtilitiesKit public final class BackgroundPoller { private static var promises: [Promise] = [] + private static var isValid: Bool = false public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) { + BackgroundPoller.isValid = true + promises = [] .appending(pollForMessages()) .appending(contentsOf: pollForClosedGroupMessages()) @@ -32,7 +35,11 @@ public final class BackgroundPoller { let poller: OpenGroupAPI.Poller = OpenGroupAPI.Poller(for: server) poller.stop() - return poller.poll(isBackgroundPoll: true, isPostCapabilitiesRetry: false) + return poller.poll( + isBackgroundPoll: true, + isBackgroundPollerValid: { BackgroundPoller.isValid }, + isPostCapabilitiesRetry: false + ) } ) @@ -41,6 +48,7 @@ public final class BackgroundPoller { // after 25 seconds allowing us to cancel all pending promises let cancelTimer: Timer = Timer.scheduledTimerOnMainThread(withTimeInterval: 25, repeats: false) { timer in timer.invalidate() + BackgroundPoller.isValid = false guard promises.contains(where: { !$0.isResolved }) else { return } @@ -50,6 +58,9 @@ public final class BackgroundPoller { when(resolved: promises) .done { _ in + // If we have already invalidated the timer then do nothing (we essentially timed out) + guard cancelTimer.isValid else { return } + cancelTimer.invalidate() completionHandler(.newData) } @@ -88,7 +99,8 @@ public final class BackgroundPoller { groupPublicKey, on: DispatchQueue.main, maxRetryCount: 0, - isBackgroundPoll: true + isBackgroundPoll: true, + isBackgroundPollValid: { BackgroundPoller.isValid } ) } } @@ -100,44 +112,45 @@ public final class BackgroundPoller { return SnodeAPI.getMessages(from: snode, associatedWith: publicKey) .then(on: DispatchQueue.main) { messages -> Promise in - guard !messages.isEmpty else { return Promise.value(()) } + guard !messages.isEmpty, BackgroundPoller.isValid else { return Promise.value(()) } var jobsToRun: [Job] = [] Storage.shared.write { db in - var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] - - messages.forEach { message in - do { - let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) - let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId) - - threadMessages[key] = (threadMessages[key] ?? []) - .appending(processedMessage?.messageInfo) - } - catch { - switch error { - // Ignore duplicate & selfSend message errors (and don't bother logging - // them as there will be a lot since we each service node duplicates messages) - case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, - MessageReceiverError.duplicateMessage, - MessageReceiverError.duplicateControlMessage, - MessageReceiverError.selfSend: - break + messages + .compactMap { message -> ProcessedMessage? in + do { + return try Message.processRawReceivedMessage(db, rawMessage: message) + } + catch { + switch error { + // Ignore duplicate & selfSend message errors (and don't bother + // logging them as there will be a lot since we each service node + // duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + // In the background ignore 'SQLITE_ABORT' (it generally means + // the BackgroundPoller has timed out + case DatabaseError.SQLITE_ABORT: break + + default: SNLog("Failed to deserialize envelope due to error: \(error).") + } - default: SNLog("Failed to deserialize envelope due to error: \(error).") + return nil } } - } - - threadMessages + .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } .forEach { threadId, threadMessages in let maybeJob: Job? = Job( variant: .messageReceive, behaviour: .runOnce, threadId: threadId, details: MessageReceiveJob.Details( - messages: threadMessages, + messages: threadMessages.map { $0.messageInfo }, isBackgroundPoll: true ) ) diff --git a/SessionMessagingKit/Database/Models/Capability.swift b/SessionMessagingKit/Database/Models/Capability.swift index 60437fa23..9feda3eb1 100644 --- a/SessionMessagingKit/Database/Models/Capability.swift +++ b/SessionMessagingKit/Database/Models/Capability.swift @@ -59,3 +59,37 @@ public struct Capability: Codable, FetchableRecord, PersistableRecord, TableReco self.isMissing = isMissing } } + +extension Capability.Variant { + // MARK: - Codable + + public init(from decoder: Decoder) throws { + let container: SingleValueDecodingContainer = try decoder.singleValueContainer() + let valueString: String = try container.decode(String.self) + + // FIXME: Remove this code + // There was a point where we didn't have custom Codable handling for the Capability.Variant + // which resulted in the data being encoded into the database as a JSON dict - this code catches + // that case and extracts the standard string value so it can be processed the same as the + // "proper" custom Codable logic) + if valueString.starts(with: "{") { + self = Capability.Variant( + from: valueString + .replacingOccurrences(of: "\":{}}", with: "") + .replacingOccurrences(of: "\"}}", with: "") + .replacingOccurrences(of: "{\"unsupported\":{\"_0\":\"", with: "") + .replacingOccurrences(of: "{\"", with: "") + ) + return + } + // FIXME: Remove this code ^^^ + + self = Capability.Variant(from: valueString) + } + + public func encode(to encoder: Encoder) throws { + var container: SingleValueEncodingContainer = encoder.singleValueContainer() + + try container.encode(rawValue) + } +} diff --git a/SessionMessagingKit/Open Groups/Models/Capabilities.swift b/SessionMessagingKit/Open Groups/Models/Capabilities.swift index 4c4332485..2947214b8 100644 --- a/SessionMessagingKit/Open Groups/Models/Capabilities.swift +++ b/SessionMessagingKit/Open Groups/Models/Capabilities.swift @@ -4,60 +4,14 @@ import Foundation extension OpenGroupAPI { public struct Capabilities: Codable, Equatable { - public enum Capability: Equatable, CaseIterable, Codable { - public static var allCases: [Capability] { - [.sogs, .blind] - } - - case sogs - case blind - - /// Fallback case if the capability isn't supported by this version of the app - case unsupported(String) - - // MARK: - Convenience - - public var rawValue: String { - switch self { - case .unsupported(let originalValue): return originalValue - default: return "\(self)" - } - } - - // MARK: - Initialization - - public init(from valueString: String) { - let maybeValue: Capability? = Capability.allCases.first { $0.rawValue == valueString } - - self = (maybeValue ?? .unsupported(valueString)) - } - } - - public let capabilities: [Capability] - public let missing: [Capability]? + public let capabilities: [Capability.Variant] + public let missing: [Capability.Variant]? // MARK: - Initialization - public init(capabilities: [Capability], missing: [Capability]? = nil) { + public init(capabilities: [Capability.Variant], missing: [Capability.Variant]? = nil) { self.capabilities = capabilities self.missing = missing } } } - -extension OpenGroupAPI.Capabilities.Capability { - // MARK: - Codable - - public init(from decoder: Decoder) throws { - let container: SingleValueDecodingContainer = try decoder.singleValueContainer() - let valueString: String = try container.decode(String.self) - - self = OpenGroupAPI.Capabilities.Capability(from: valueString) - } - - public func encode(to encoder: Encoder) throws { - var container: SingleValueEncodingContainer = encoder.singleValueContainer() - - try container.encode(rawValue) - } -} diff --git a/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift b/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift index f668454bb..f566565f7 100644 --- a/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift +++ b/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift @@ -10,6 +10,7 @@ extension OpenGroupAPI { case sender = "session_id" case posted case edited + case deleted case seqNo = "seqno" case whisper case whisperMods = "whisper_mods" @@ -23,6 +24,7 @@ extension OpenGroupAPI { public let sender: String? public let posted: TimeInterval public let edited: TimeInterval? + public let deleted: Bool? public let seqNo: Int64 public let whisper: Bool public let whisperMods: Bool @@ -79,6 +81,7 @@ extension OpenGroupAPI.Message { sender: try? container.decode(String.self, forKey: .sender), posted: try container.decode(TimeInterval.self, forKey: .posted), edited: try? container.decode(TimeInterval.self, forKey: .edited), + deleted: try? container.decode(Bool.self, forKey: .deleted), seqNo: try container.decode(Int64.self, forKey: .seqNo), whisper: ((try? container.decode(Bool.self, forKey: .whisper)) ?? false), whisperMods: ((try? container.decode(Bool.self, forKey: .whisperMods)) ?? false), diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index c72dea525..ea750d7cd 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -348,7 +348,7 @@ public final class OpenGroupManager: NSObject { capabilities.capabilities.forEach { capability in _ = try? Capability( openGroupServer: server.lowercased(), - variant: Capability.Variant(from: capability.rawValue), + variant: capability, isMissing: false ) .saved(db) @@ -356,7 +356,7 @@ public final class OpenGroupManager: NSObject { capabilities.missing?.forEach { capability in _ = try? Capability( openGroupServer: server.lowercased(), - variant: Capability.Variant(from: capability.rawValue), + variant: capability, isMissing: true ) .saved(db) @@ -499,9 +499,12 @@ public final class OpenGroupManager: NSObject { } let sortedMessages: [OpenGroupAPI.Message] = messages + .filter { $0.deleted != true } .sorted { lhs, rhs in lhs.id < rhs.id } + let messageServerIdsToRemove: [Int64] = messages + .filter { $0.deleted == true } + .map { $0.id } let seqNo: Int64? = sortedMessages.map { $0.seqNo }.max() - var messageServerIdsToRemove: [UInt64] = [] // Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId') if let seqNo: Int64 = seqNo { @@ -515,11 +518,7 @@ public final class OpenGroupManager: NSObject { guard let base64EncodedString: String = message.base64EncodedData, let data = Data(base64Encoded: base64EncodedString) - else { - // A message with no data has been deleted so add it to the list to remove - messageServerIdsToRemove.append(UInt64(message.id)) - return - } + else { return } do { let processedMessage: ProcessedMessage? = try Message.processReceivedOpenGroupMessage( diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift index d875c0f0e..46807cf60 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift @@ -13,8 +13,7 @@ extension MessageReceiver { switch message.kind { case .started: - TypingIndicators.didStartTyping( - db, + let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart( threadId: thread.id, threadVariant: thread.variant, threadIsMessageRequest: thread.isMessageRequest(db), @@ -22,6 +21,10 @@ extension MessageReceiver { timestampMs: message.sentTimestamp.map { Int64($0) } ) + if needsToStartTypingIndicator { + TypingIndicators.start(db, threadId: thread.id, direction: .incoming) + } + case .stopped: TypingIndicators.didStopTyping(db, threadId: thread.id, direction: .incoming) diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index c20391a42..438ffa55f 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -291,7 +291,7 @@ public final class MessageSender { errorCount += 1 guard errorCount == promiseCount else { return } // Only error out if all promises failed - Storage.shared.write { db in + Storage.shared.read { db in handleFailure(db, with: .other(error)) } } @@ -300,7 +300,7 @@ public final class MessageSender { .catch(on: DispatchQueue.global(qos: .default)) { error in SNLog("Couldn't send message due to error: \(error).") - Storage.shared.write { db in + Storage.shared.read { db in handleFailure(db, with: .other(error)) } } @@ -447,7 +447,7 @@ public final class MessageSender { } } .catch(on: DispatchQueue.global(qos: .default)) { error in - dependencies.storage.write { db in + dependencies.storage.read { db in handleFailure(db, with: .other(error)) } } @@ -557,7 +557,7 @@ public final class MessageSender { } } .catch(on: DispatchQueue.global(qos: .default)) { error in - dependencies.storage.write { db in + dependencies.storage.read { db in handleFailure(db, with: .other(error)) } } @@ -652,15 +652,34 @@ public final class MessageSender { with error: MessageSenderError, interactionId: Int64? ) { - // Mark any "sending" recipients as "failed" - _ = try? RecipientState + // Check if we need to mark any "sending" recipients as "failed" + // + // Note: The 'db' could be either read-only or writeable so we determine + // if a change is required, and if so dispatch to a separate queue for the + // actual write + let rowIds: [Int64] = (try? RecipientState + .select(Column.rowID) .filter(RecipientState.Columns.interactionId == interactionId) .filter(RecipientState.Columns.state == RecipientState.State.sending) - .updateAll( - db, - RecipientState.Columns.state.set(to: RecipientState.State.failed), - RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription) - ) + .asRequest(of: Int64.self) + .fetchAll(db)) + .defaulting(to: []) + + guard !rowIds.isEmpty else { return } + + // Need to dispatch to a different thread to prevent a potential db re-entrancy + // issue from occuring in some cases + DispatchQueue.global(qos: .background).async { + Storage.shared.write { db in + try RecipientState + .filter(rowIds.contains(Column.rowID)) + .updateAll( + db, + RecipientState.Columns.state.set(to: RecipientState.State.failed), + RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription) + ) + } + } } // MARK: - Convenience diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index f747a1cc2..ad5c8cda5 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -152,6 +152,7 @@ public final class ClosedGroupPoller { on queue: DispatchQueue = SessionSnodeKit.Threading.workQueue, maxRetryCount: UInt = 0, isBackgroundPoll: Bool = false, + isBackgroundPollValid: @escaping (() -> Bool) = { true }, poller: ClosedGroupPoller? = nil ) -> Promise { let promise: Promise = SnodeAPI.getSwarm(for: groupPublicKey) @@ -160,9 +161,10 @@ public final class ClosedGroupPoller { guard let snode = swarm.randomElement() else { return Promise(error: Error.insufficientSnodes) } return attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) { - guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { - return Promise(error: Error.pollingCanceled) - } + guard + (isBackgroundPoll && isBackgroundPollValid()) || + poller?.isPolling.wrappedValue[groupPublicKey] == true + else { return Promise(error: Error.pollingCanceled) } let promises: [Promise<[SnodeReceivedMessage]>] = { if SnodeAPI.hardfork >= 19 && SnodeAPI.softfork >= 1 { @@ -181,9 +183,13 @@ public final class ClosedGroupPoller { return when(resolved: promises) .then(on: queue) { messageResults -> Promise in - guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) } + guard + (isBackgroundPoll && isBackgroundPollValid()) || + poller?.isPolling.wrappedValue[groupPublicKey] == true + else { return Promise.value(()) } var promises: [Promise] = [] + var jobToRun: Job? = nil let allMessages: [SnodeReceivedMessage] = messageResults .reduce([]) { result, next in switch next { @@ -192,8 +198,16 @@ public final class ClosedGroupPoller { } } var messageCount: Int = 0 - let totalMessagesCount: Int = allMessages.count + // No need to do anything if there are no messages + guard !allMessages.isEmpty else { + if !isBackgroundPoll { + SNLog("Received no new messages in closed group with public key: \(groupPublicKey)") + } + return Promise.value(()) + } + + // Otherwise process the messages and add them to the queue for handling Storage.shared.write { db in let processedMessages: [ProcessedMessage] = allMessages .compactMap { message -> ProcessedMessage? in @@ -209,6 +223,14 @@ public final class ClosedGroupPoller { MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break + + // In the background ignore 'SQLITE_ABORT' (it generally means + // the BackgroundPoller has timed out + case DatabaseError.SQLITE_ABORT: + guard !isBackgroundPoll else { break } + + SNLog("Failed to the database being suspended (running in background with no background task).") + break default: SNLog("Failed to deserialize envelope due to error: \(error).") } @@ -219,7 +241,7 @@ public final class ClosedGroupPoller { messageCount = processedMessages.count - let jobToRun: Job? = Job( + jobToRun = Job( variant: .messageReceive, behaviour: .runOnce, threadId: groupPublicKey, @@ -232,35 +254,29 @@ public final class ClosedGroupPoller { // If we are force-polling then add to the JobRunner so they are persistent and will retry on // the next app run if they fail but don't let them auto-start JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll) - - // We want to try to handle the receive jobs immediately in the background - if isBackgroundPoll { - promises = promises.appending( - jobToRun.map { job -> Promise in - let (promise, seal) = Promise.pending() - - // Note: In the background we just want jobs to fail silently - MessageReceiveJob.run( - job, - queue: queue, - success: { _, _ in seal.fulfill(()) }, - failure: { _, _, _ in seal.fulfill(()) }, - deferred: { _ in seal.fulfill(()) } - ) - - return promise - } - ) - } } - if !isBackgroundPoll { - if totalMessagesCount > 0 { - SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(totalMessagesCount - messageCount))") - } - else { - SNLog("Received no new messages in closed group with public key: \(groupPublicKey)") - } + if isBackgroundPoll { + // We want to try to handle the receive jobs immediately in the background + promises = promises.appending( + jobToRun.map { job -> Promise in + let (promise, seal) = Promise.pending() + + // Note: In the background we just want jobs to fail silently + MessageReceiveJob.run( + job, + queue: queue, + success: { _, _ in seal.fulfill(()) }, + failure: { _, _, _ in seal.fulfill(()) }, + deferred: { _ in seal.fulfill(()) } + ) + + return promise + } + ) + } + else { + SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(allMessages.count - messageCount))") } return when(fulfilled: promises) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift index 52c2714fd..fc4b15eb1 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift @@ -8,6 +8,8 @@ import SessionUtilitiesKit extension OpenGroupAPI { public final class Poller { + typealias PollResponse = [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)] + private let server: String private var timer: Timer? = nil private var hasStarted = false @@ -71,6 +73,7 @@ extension OpenGroupAPI { @discardableResult public func poll( isBackgroundPoll: Bool, + isBackgroundPollerValid: @escaping (() -> Bool) = { true }, isPostCapabilitiesRetry: Bool, using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies() ) -> Promise { @@ -83,8 +86,14 @@ extension OpenGroupAPI { Threading.pollerQueue.async { dependencies.storage - .read { db in - OpenGroupAPI + .read { db -> Promise<(Int64, PollResponse)> in + let failureCount: Int64 = (try? OpenGroup + .select(max(OpenGroup.Columns.pollFailureCount)) + .asRequest(of: Int64.self) + .fetchOne(db)) + .defaulting(to: 0) + + return OpenGroupAPI .poll( db, server: server, @@ -95,10 +104,24 @@ extension OpenGroupAPI { ), using: dependencies ) + .map(on: OpenGroupAPI.workQueue) { (failureCount, $0) } } - .done(on: OpenGroupAPI.workQueue) { [weak self] response in + .done(on: OpenGroupAPI.workQueue) { [weak self] failureCount, response in + guard !isBackgroundPoll || isBackgroundPollerValid() else { + // If this was a background poll and the background poll is no longer valid + // then just stop + self?.isPolling = false + seal.fulfill(()) + return + } + self?.isPolling = false - self?.handlePollResponse(response, isBackgroundPoll: isBackgroundPoll, using: dependencies) + self?.handlePollResponse( + response, + failureCount: failureCount, + isBackgroundPoll: isBackgroundPoll, + using: dependencies + ) dependencies.mutableCache.mutate { cache in cache.hasPerformedInitialPoll[server] = true @@ -106,17 +129,18 @@ extension OpenGroupAPI { UserDefaults.standard[.lastOpen] = Date() } - // Reset the failure count - Storage.shared.writeAsync { db in - try OpenGroup - .filter(OpenGroup.Columns.server == server) - .updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0)) - } - SNLog("Open group polling finished for \(server).") seal.fulfill(()) } .catch(on: OpenGroupAPI.workQueue) { [weak self] error in + guard !isBackgroundPoll || isBackgroundPollerValid() else { + // If this was a background poll and the background poll is no longer valid + // then just stop + self?.isPolling = false + seal.fulfill(()) + return + } + // If we are retrying then the error is being handled so no need to continue (this // method will always resolve) self?.updateCapabilitiesAndRetryIfNeeded( @@ -141,7 +165,10 @@ extension OpenGroupAPI { Storage.shared.writeAsync { db in try OpenGroup .filter(OpenGroup.Columns.server == server) - .updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1))) + .updateAll( + db, + OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1)) + ) } SNLog("Open group polling failed due to error: \(error). Setting failure count to \(pollFailureCount).") @@ -221,18 +248,166 @@ extension OpenGroupAPI { return promise } - private func handlePollResponse(_ response: [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)], isBackgroundPoll: Bool, using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) { + private func handlePollResponse( + _ response: PollResponse, + failureCount: Int64, + isBackgroundPoll: Bool, + using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies() + ) { let server: String = self.server - - dependencies.storage.write { db in - try response.forEach { endpoint, endpointResponse in + let validResponses: PollResponse = response + .filter { endpoint, endpointResponse in switch endpoint { case .capabilities: - guard let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, let responseBody: Capabilities = responseData.body else { + guard (endpointResponse.data as? BatchSubResponse)?.body != nil else { SNLog("Open group polling failed due to invalid capability data.") - return + return false } + return true + + case .roomPollInfo(let roomToken, _): + guard (endpointResponse.data as? BatchSubResponse)?.body != nil else { + switch (endpointResponse.data as? BatchSubResponse)?.code { + case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.") + default: SNLog("Open group polling failed due to invalid room info data.") + } + return false + } + + return true + + case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _): + guard + let responseData: BatchSubResponse<[Failable]> = endpointResponse.data as? BatchSubResponse<[Failable]>, + let responseBody: [Failable] = responseData.body + else { + switch (endpointResponse.data as? BatchSubResponse<[Failable]>)?.code { + case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.") + default: SNLog("Open group polling failed due to invalid messages data.") + } + return false + } + + let successfulMessages: [Message] = responseBody.compactMap { $0.value } + + if successfulMessages.count != responseBody.count { + let droppedCount: Int = (responseBody.count - successfulMessages.count) + + SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").") + } + + return !successfulMessages.isEmpty + + case .inbox, .inboxSince, .outbox, .outboxSince: + guard + let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, + !responseData.failedToParseBody + else { + SNLog("Open group polling failed due to invalid inbox/outbox data.") + return false + } + + // Double optional because the server can return a `304` with an empty body + let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? []) + + return !messages.isEmpty + + default: return false // No custom handling needed + } + } + + // If there are no remaining 'validResponses' and there hasn't been a failure then there is + // no need to do anything else + guard !validResponses.isEmpty || failureCount != 0 else { return } + + // Retrieve the current capability & group info to check if anything changed + let rooms: [String] = validResponses + .keys + .compactMap { endpoint -> String? in + switch endpoint { + case .roomPollInfo(let roomToken, _): return roomToken + default: return nil + } + } + let currentInfo: (capabilities: Capabilities, groups: [OpenGroup])? = dependencies.storage.read { db in + let allCapabilities: [Capability] = try Capability + .filter(Capability.Columns.openGroupServer == server) + .fetchAll(db) + let capabilities: Capabilities = Capabilities( + capabilities: allCapabilities + .filter { !$0.isMissing } + .map { $0.variant }, + missing: { + let missingCapabilities: [Capability.Variant] = allCapabilities + .filter { $0.isMissing } + .map { $0.variant } + + return (missingCapabilities.isEmpty ? nil : missingCapabilities) + }() + ) + let openGroupIds: [String] = rooms + .map { OpenGroup.idFor(roomToken: $0, server: server) } + let groups: [OpenGroup] = try OpenGroup + .filter(ids: openGroupIds) + .fetchAll(db) + + return (capabilities, groups) + } + let changedResponses: PollResponse = validResponses + .filter { endpoint, endpointResponse in + switch endpoint { + case .capabilities: + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: Capabilities = responseData.body + else { return false } + + return (responseBody != currentInfo?.capabilities) + + case .roomPollInfo(let roomToken, _): + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: RoomPollInfo = responseData.body + else { return false } + guard let existingOpenGroup: OpenGroup = currentInfo?.groups.first(where: { $0.roomToken == roomToken }) else { + return true + } + + // Note: This might need to be updated in the future when we start tracking + // user permissions if changes to permissions don't trigger a change to + // the 'infoUpdates' + return ( + responseBody.activeUsers != existingOpenGroup.userCount || ( + responseBody.details != nil && + responseBody.details?.infoUpdates != existingOpenGroup.infoUpdates + ) + ) + + default: return true + } + } + + // If there are no 'changedResponses' and there hasn't been a failure then there is + // no need to do anything else + guard !changedResponses.isEmpty || failureCount != 0 else { return } + + dependencies.storage.write { db in + // Reset the failure count + if failureCount > 0 { + try OpenGroup + .filter(OpenGroup.Columns.server == server) + .updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0)) + } + + try changedResponses.forEach { endpoint, endpointResponse in + switch endpoint { + case .capabilities: + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: Capabilities = responseData.body + else { return } + OpenGroupManager.handleCapabilities( db, capabilities: responseBody, @@ -240,13 +415,10 @@ extension OpenGroupAPI { ) case .roomPollInfo(let roomToken, _): - guard let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, let responseBody: RoomPollInfo = responseData.body else { - switch (endpointResponse.data as? BatchSubResponse)?.code { - case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.") - default: SNLog("Open group polling failed due to invalid room info data.") - } - return - } + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: RoomPollInfo = responseData.body + else { return } try OpenGroupManager.handlePollInfo( db, @@ -258,24 +430,14 @@ extension OpenGroupAPI { ) case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _): - guard let responseData: BatchSubResponse<[Failable]> = endpointResponse.data as? BatchSubResponse<[Failable]>, let responseBody: [Failable] = responseData.body else { - switch (endpointResponse.data as? BatchSubResponse<[Failable]>)?.code { - case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.") - default: SNLog("Open group polling failed due to invalid messages data.") - } - return - } - let successfulMessages: [Message] = responseBody.compactMap { $0.value } - - if successfulMessages.count != responseBody.count { - let droppedCount: Int = (responseBody.count - successfulMessages.count) - - SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").") - } + guard + let responseData: BatchSubResponse<[Failable]> = endpointResponse.data as? BatchSubResponse<[Failable]>, + let responseBody: [Failable] = responseData.body + else { return } OpenGroupManager.handleMessages( db, - messages: successfulMessages, + messages: responseBody.compactMap { $0.value }, for: roomToken, on: server, isBackgroundPoll: isBackgroundPoll, @@ -283,10 +445,10 @@ extension OpenGroupAPI { ) case .inbox, .inboxSince, .outbox, .outboxSince: - guard let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, !responseData.failedToParseBody else { - SNLog("Open group polling failed due to invalid inbox/outbox data.") - return - } + guard + let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, + !responseData.failedToParseBody + else { return } // Double optional because the server can return a `304` with an empty body let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? []) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 9e2c0d310..4877077d2 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -150,6 +150,10 @@ public final class Poller { MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break + + case DatabaseError.SQLITE_ABORT: + SNLog("Failed to the database being suspended (running in background with no background task).") + break default: SNLog("Failed to deserialize envelope due to error: \(error).") } diff --git a/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift b/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift index d2d99806e..cdda8b025 100644 --- a/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift +++ b/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift @@ -44,10 +44,7 @@ public class TypingIndicators { self.timestampMs = (timestampMs ?? Int64(floor(Date().timeIntervalSince1970 * 1000))) } - fileprivate func starting(_ db: Database) -> Indicator { - let direction: Direction = self.direction - let timestampMs: Int64 = self.timestampMs - + fileprivate func start(_ db: Database) { // Start the typing indicator switch direction { case .outgoing: @@ -55,27 +52,17 @@ public class TypingIndicators { case .incoming: try? ThreadTypingIndicator( - threadId: self.threadId, + threadId: threadId, timestampMs: timestampMs ) .save(db) } - // Schedule the 'stopCallback' to cancel the typing indicator - stopTimer?.invalidate() - stopTimer = Timer.scheduledTimerOnMainThread( - withTimeInterval: (direction == .outgoing ? 3 : 5), - repeats: false - ) { [weak self] _ in - Storage.shared.write { db in - self?.stoping(db) - } - } - - return self + // Refresh the timeout since we just started + refreshTimeout() } - @discardableResult fileprivate func stoping(_ db: Database) -> Indicator? { + fileprivate func stop(_ db: Database) { self.refreshTimer?.invalidate() self.refreshTimer = nil self.stopTimer?.invalidate() @@ -84,7 +71,7 @@ public class TypingIndicators { switch direction { case .outgoing: guard let thread: SessionThread = try? SessionThread.fetchOne(db, id: self.threadId) else { - return nil + return } try? MessageSender.send( @@ -99,8 +86,22 @@ public class TypingIndicators { .filter(ThreadTypingIndicator.Columns.threadId == self.threadId) .deleteAll(db) } + } + + fileprivate func refreshTimeout() { + let threadId: String = self.threadId + let direction: Direction = self.direction - return nil + // Schedule the 'stopCallback' to cancel the typing indicator + stopTimer?.invalidate() + stopTimer = Timer.scheduledTimerOnMainThread( + withTimeInterval: (direction == .outgoing ? 3 : 5), + repeats: false + ) { _ in + Storage.shared.write { db in + TypingIndicators.didStopTyping(db, threadId: threadId, direction: direction) + } + } } private func scheduleRefreshCallback(_ db: Database, shouldSend: Bool = true) { @@ -138,56 +139,76 @@ public class TypingIndicators { // MARK: - Functions - public static func didStartTyping( - _ db: Database, + public static func didStartTypingNeedsToStart( threadId: String, threadVariant: SessionThread.Variant, threadIsMessageRequest: Bool, direction: Direction, timestampMs: Int64? - ) { + ) -> Bool { switch direction { case .outgoing: - let updatedIndicator: Indicator? = ( - outgoing.wrappedValue[threadId] ?? - Indicator( - threadId: threadId, - threadVariant: threadVariant, - threadIsMessageRequest: threadIsMessageRequest, - direction: direction, - timestampMs: timestampMs - ) - )?.starting(db) + // If we already have an existing typing indicator for this thread then just + // refresh it's timeout (no need to do anything else) + if let existingIndicator: Indicator = outgoing.wrappedValue[threadId] { + existingIndicator.refreshTimeout() + return false + } - outgoing.mutate { $0[threadId] = updatedIndicator } + let newIndicator: Indicator? = Indicator( + threadId: threadId, + threadVariant: threadVariant, + threadIsMessageRequest: threadIsMessageRequest, + direction: direction, + timestampMs: timestampMs + ) + newIndicator?.refreshTimeout() + + outgoing.mutate { $0[threadId] = newIndicator } + return true case .incoming: - let updatedIndicator: Indicator? = ( - incoming.wrappedValue[threadId] ?? - Indicator( - threadId: threadId, - threadVariant: threadVariant, - threadIsMessageRequest: threadIsMessageRequest, - direction: direction, - timestampMs: timestampMs - ) - )?.starting(db) + // If we already have an existing typing indicator for this thread then just + // refresh it's timeout (no need to do anything else) + if let existingIndicator: Indicator = incoming.wrappedValue[threadId] { + existingIndicator.refreshTimeout() + return false + } - incoming.mutate { $0[threadId] = updatedIndicator } + let newIndicator: Indicator? = Indicator( + threadId: threadId, + threadVariant: threadVariant, + threadIsMessageRequest: threadIsMessageRequest, + direction: direction, + timestampMs: timestampMs + ) + newIndicator?.refreshTimeout() + + incoming.mutate { $0[threadId] = newIndicator } + return true + } + } + + public static func start(_ db: Database, threadId: String, direction: Direction) { + switch direction { + case .outgoing: outgoing.wrappedValue[threadId]?.start(db) + case .incoming: incoming.wrappedValue[threadId]?.start(db) } } public static func didStopTyping(_ db: Database, threadId: String, direction: Direction) { switch direction { case .outgoing: - let updatedIndicator: Indicator? = outgoing.wrappedValue[threadId]?.stoping(db) - - outgoing.mutate { $0[threadId] = updatedIndicator } + if let indicator: Indicator = outgoing.wrappedValue[threadId] { + indicator.stop(db) + outgoing.mutate { $0[threadId] = nil } + } case .incoming: - let updatedIndicator: Indicator? = incoming.wrappedValue[threadId]?.stoping(db) - - incoming.mutate { $0[threadId] = updatedIndicator } + if let indicator: Indicator = incoming.wrappedValue[threadId] { + indicator.stop(db) + incoming.mutate { $0[threadId] = nil } + } } } } diff --git a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift index e9b5fbfbb..16b66672d 100644 --- a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift +++ b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift @@ -68,19 +68,34 @@ public extension SnodeReceivedMessageInfo { public extension SnodeReceivedMessageInfo { static func pruneExpiredMessageHashInfo(for snode: Snode, namespace: Int, associatedWith publicKey: String) { - // Delete any expired SnodeReceivedMessageInfo values associated to a specific node + // Delete any expired SnodeReceivedMessageInfo values associated to a specific node (even though + // this runs very quickly we fetch the rowIds we want to delete from a 'read' call to avoid + // blocking the write queue since this method is called very frequently) + let rowIds: [Int64] = Storage.shared + .read { db in + // Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want + // to clear out the legacy hashes) + let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo + .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) + .isNotEmpty(db) + + guard hasNonLegacyHash else { return [] } + + return try SnodeReceivedMessageInfo + .select(Column.rowID) + .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) + .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000)) + .asRequest(of: Int64.self) + .fetchAll(db) + } + .defaulting(to: []) + + // If there are no rowIds to delete then do nothing + guard !rowIds.isEmpty else { return } + Storage.shared.write { db in - // Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want - // to clear out the legacy hashes) - let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo - .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) - .isNotEmpty(db) - - guard hasNonLegacyHash else { return } - try SnodeReceivedMessageInfo - .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) - .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000)) + .filter(rowIds.contains(Column.rowID)) .deleteAll(db) } } diff --git a/SessionUtilitiesKit/Database/Storage.swift b/SessionUtilitiesKit/Database/Storage.swift index 87e283918..65d515f4b 100644 --- a/SessionUtilitiesKit/Database/Storage.swift +++ b/SessionUtilitiesKit/Database/Storage.swift @@ -60,6 +60,7 @@ public final class Storage { // Configure the database and create the DatabasePool for interacting with the database var config = Configuration() config.maximumReaderCount = 10 // Increase the max read connection limit - Default is 5 + config.observesSuspensionNotifications = true // Minimise `0xDEAD10CC` exceptions config.prepareDatabase { db in var keySpec: Data = Storage.getOrGenerateDatabaseKeySpec() defer { keySpec.resetBytes(in: 0.. Value? { + guard let key: Key = key else { return nil } + + return self[key] + } + func setting(_ key: Key?, _ value: Value?) -> [Key: Value] { guard let key: Key = key else { return self } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 0c1950947..c84a11283 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -126,6 +126,9 @@ public final class JobRunner { queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } + // Don't start the queue if the job can't be started + guard canStartJob else { return } + // Start the job runner if needed db.afterNextTransactionCommit { _ in queues.wrappedValue[updatedJob.variant]?.start() @@ -253,6 +256,15 @@ public final class JobRunner { JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true } } + /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run + /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their + /// failure - they _should_ be picked up again the next time the app is launched) + public static func stopAndClearPendingJobs() { + queues.wrappedValue.values.forEach { queue in + queue.stopAndClearPendingJobs() + } + } + public static func isCurrentlyRunning(_ job: Job?) -> Bool { guard let job: Job = job, let jobId: Int64 = job.id else { return false } @@ -347,6 +359,8 @@ private final class JobQueue { } } + private static let deferralLoopThreshold: Int = 3 + private let type: QueueType private let executionType: ExecutionType private let qosClass: DispatchQoS @@ -376,6 +390,7 @@ private final class JobQueue { private var queue: Atomic<[Job]> = Atomic([]) private var jobsCurrentlyRunning: Atomic> = Atomic([]) private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) + private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } @@ -555,7 +570,16 @@ private final class JobQueue { runNextJob() } + fileprivate func stopAndClearPendingJobs() { + isRunning.mutate { $0 = false } + queue.mutate { $0 = [] } + deferLoopTracker.mutate { $0 = [:] } + } + private func runNextJob() { + // Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job) + guard isRunning.wrappedValue else { return } + // Ensure this is running on the correct queue guard DispatchQueue.getSpecific(key: queueKey) == queueContext else { internalQueue.async { [weak self] in @@ -652,7 +676,7 @@ private final class JobQueue { return } - // Update the state to indicate it's running + // Update the state to indicate the particular job is running // // Note: We need to store 'numJobsRemaining' in it's own variable because // the 'SNLog' seems to dispatch to it's own queue which ends up getting @@ -662,7 +686,6 @@ private final class JobQueue { trigger?.invalidate() // Need to invalidate to prevent a memory leak trigger = nil } - isRunning.mutate { $0 = true } jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) numJobsRunning = jobsCurrentlyRunning.count @@ -779,13 +802,20 @@ private final class JobQueue { // `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over // and over and reset their retry backoff in case they fail next time case .recurringOnLaunch, .recurringOnActive: - Storage.shared.write { db in - _ = try job - .with( - failureCount: 0, - nextRunTimestamp: 0 - ) - .saved(db) + if + let jobId: Int64 = job.id, + job.failureCount != 0 && + job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude + { + Storage.shared.write { db in + _ = try Job + .filter(id: jobId) + .updateAll( + db, + Job.Columns.failureCount.set(to: 0), + Job.Columns.nextRunTimestamp.set(to: 0) + ) + } } default: break @@ -927,8 +957,48 @@ private final class JobQueue { /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant /// on other jobs, and it should automatically manage those dependencies) private func handleJobDeferred(_ job: Job) { + var stuckInDeferLoop: Bool = false jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + deferLoopTracker.mutate { + guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else { + $0 = $0.setting( + job.id, + (1, [Date().timeIntervalSince1970]) + ) + return + } + + let timeNow: TimeInterval = Date().timeIntervalSince1970 + stuckInDeferLoop = ( + lastRecord.count >= JobQueue.deferralLoopThreshold && + (timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count) + ) + + $0 = $0.setting( + job.id, + ( + lastRecord.count + 1, + // Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster + // than one loop per second + lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow] + ) + ) + } + + // It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately + // defers itself but then attempts to run again (resulting in an infinite loop); this won't block + // the app since it's on a background thread but can result in 100% of a CPU being used (and a + // battery drain) + // + // This code will maintain an in-memory store for any jobs which are deferred too quickly (ie. + // more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds) + guard !stuckInDeferLoop else { + deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) } + handleJobFailed(job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false) + return + } + internalQueue.async { [weak self] in self?.runNextJob() } diff --git a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift index 15e2b23a2..8d015095d 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift @@ -11,4 +11,6 @@ public enum JobRunnerError: Error { case missingRequiredDetails case missingDependencies + + case possibleDeferralLoop }