From 1224e539ead4f2724a79218573f587bc4f6cc112 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 5 Aug 2022 17:10:01 +1000 Subject: [PATCH 1/2] Reduced unneeded DB write operations and fixed a few minor UI bugs Updated the database to better support the application getting suspended (0xdead10cc crash) Updated the SOGS message handling to delete messages based on a new 'deleted' flag instead of 'data' being null Updated the code to prevent the typing indicator from needing a DB write block as frequently Updated the code to stop any pending jobs when entering the background (in an attempt to prevent the database suspension from causing issues) Removed the duplicate 'Capabilities.Capability' type (updated 'Capability.Variant' to work in the same way) Fixed a bug where a number of icons (inc. the "download document" icon) were the wrong colour in dark mode Fixed a bug where the '@You' highlight could incorrectly have it's width reduced in some cases (had protection to prevent it being larger than the line, but that is a valid case) Fixed a bug where the JobRunner was starting the background (which could lead to trying to access the database once it had been suspended) Updated to the latest version of GRDB Added some logic to the BackgroundPoller process to try and stop processing if the timeout is triggered (will catch some cases but others will end up logging a bunch of "Database is suspended" errors) Added in some protection to prevent future deferral loops in the JobRunner --- Podfile.lock | 4 +- .../ConversationVC+Interaction.swift | 20 +- .../Conversations/ConversationViewModel.swift | 23 +- .../Input View/InputViewButton.swift | 4 +- .../Content Views/CallMessageView.swift | 2 +- .../Content Views/DeletedMessageView.swift | 4 +- .../Content Views/MediaPlaceholderView.swift | 4 +- .../OpenGroupInvitationView.swift | 2 +- Session/Meta/AppDelegate.swift | 17 +- .../HighlightMentionBackgroundView.swift | 3 - Session/Utilities/BackgroundPoller.swift | 67 +++-- .../Database/Models/Capability.swift | 34 +++ .../Open Groups/Models/Capabilities.swift | 52 +--- .../Open Groups/Models/SOGSMessage.swift | 3 + .../Open Groups/OpenGroupManager.swift | 15 +- .../MessageReceiver+TypingIndicators.swift | 7 +- .../Sending & Receiving/MessageSender.swift | 41 ++- .../Pollers/ClosedGroupPoller.swift | 82 +++--- .../Pollers/OpenGroupPoller.swift | 250 +++++++++++++++--- .../Sending & Receiving/Pollers/Poller.swift | 4 + .../Typing Indicators/TypingIndicators.swift | 123 +++++---- .../Models/SnodeReceivedMessageInfo.swift | 37 ++- SessionUtilitiesKit/Database/Storage.swift | 2 +- .../General/Dictionary+Utilities.swift | 6 + SessionUtilitiesKit/JobRunner/JobRunner.swift | 88 +++++- .../JobRunner/JobRunnerError.swift | 2 + 26 files changed, 625 insertions(+), 271 deletions(-) diff --git a/Podfile.lock b/Podfile.lock index 89c8650fc..37f4ac9c5 100644 --- a/Podfile.lock +++ b/Podfile.lock @@ -27,7 +27,7 @@ PODS: - DifferenceKit/Core (1.2.0) - DifferenceKit/UIKitExtension (1.2.0): - DifferenceKit/Core - - GRDB.swift/SQLCipher (5.24.1): + - GRDB.swift/SQLCipher (5.26.0): - SQLCipher (>= 3.4.0) - libwebp (1.2.1): - libwebp/demux (= 1.2.1) @@ -222,7 +222,7 @@ SPEC CHECKSUMS: CryptoSwift: a532e74ed010f8c95f611d00b8bbae42e9fe7c17 Curve25519Kit: e63f9859ede02438ae3defc5e1a87e09d1ec7ee6 DifferenceKit: 5659c430bb7fe45876fa32ce5cba5d6167f0c805 - GRDB.swift: b3180ce2135fc06a453297889b746b1478c4d8c7 + GRDB.swift: 1395cb3556df6b16ed69dfc74c3886abc75d2825 libwebp: 98a37e597e40bfdb4c911fc98f2c53d0b12d05fc Nimble: 5316ef81a170ce87baf72dd961f22f89a602ff84 NVActivityIndicatorView: 1f6c5687f1171810aa27a3296814dc2d7dec3667 diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 33dfce0fb..caef86415 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -520,16 +520,18 @@ extension ConversationVC: let threadId: String = self.viewModel.threadData.threadId let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant let threadIsMessageRequest: Bool = (self.viewModel.threadData.threadIsMessageRequest == true) + let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart( + threadId: threadId, + threadVariant: threadVariant, + threadIsMessageRequest: threadIsMessageRequest, + direction: .outgoing, + timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000)) + ) - Storage.shared.writeAsync { db in - TypingIndicators.didStartTyping( - db, - threadId: threadId, - threadVariant: threadVariant, - threadIsMessageRequest: threadIsMessageRequest, - direction: .outgoing, - timestampMs: Int64(floor(Date().timeIntervalSince1970 * 1000)) - ) + if needsToStartTypingIndicator { + Storage.shared.writeAsync { db in + TypingIndicators.start(db, threadId: threadId, direction: .outgoing) + } } } diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index 28bf921a7..c151ea467 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -418,15 +418,34 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { // MARK: - Functions public func updateDraft(to draft: String) { + let threadId: String = self.threadId + let currentDraft: String = Storage.shared + .read { db in + try SessionThread + .select(.messageDraft) + .filter(id: threadId) + .asRequest(of: String.self) + .fetchOne(db) + } + .defaulting(to: "") + + // Only write the updated draft to the database if it's changed (avoid unnecessary writes) + guard draft != currentDraft else { return } + Storage.shared.writeAsync { db in try SessionThread - .filter(id: self.threadId) + .filter(id: threadId) .updateAll(db, SessionThread.Columns.messageDraft.set(to: draft)) } } public func markAllAsRead() { - guard let lastInteractionId: Int64 = self.threadData.interactionId else { return } + // Don't bother marking anything as read if there are no unread interactions (we can rely + // on the 'threadData.threadUnreadCount' to always be accurate) + guard + (self.threadData.threadUnreadCount ?? 0) > 0, + let lastInteractionId: Int64 = self.threadData.interactionId + else { return } let threadId: String = self.threadData.threadId let trySendReadReceipt: Bool = (self.threadData.threadIsMessageRequest == false) diff --git a/Session/Conversations/Input View/InputViewButton.swift b/Session/Conversations/Input View/InputViewButton.swift index 26d2b495d..a9d2ca015 100644 --- a/Session/Conversations/Input View/InputViewButton.swift +++ b/Session/Conversations/Input View/InputViewButton.swift @@ -59,8 +59,8 @@ final class InputViewButton : UIView { isUserInteractionEnabled = true widthConstraint.isActive = true heightConstraint.isActive = true - let tint = isSendButton ? UIColor.black : Colors.text - let iconImageView = UIImageView(image: icon.withTint(tint)) + let iconImageView = UIImageView(image: icon.withRenderingMode(.alwaysTemplate)) + iconImageView.tintColor = (isSendButton ? UIColor.black : Colors.text) iconImageView.contentMode = .scaleAspectFit let iconSize = InputViewButton.iconSize iconImageView.set(.width, to: iconSize) diff --git a/Session/Conversations/Message Cells/Content Views/CallMessageView.swift b/Session/Conversations/Message Cells/Content Views/CallMessageView.swift index ffc527311..f27302f8f 100644 --- a/Session/Conversations/Message Cells/Content Views/CallMessageView.swift +++ b/Session/Conversations/Message Cells/Content Views/CallMessageView.swift @@ -28,8 +28,8 @@ final class CallMessageView: UIView { // Image view let imageView: UIImageView = UIImageView( image: UIImage(named: "Phone")? + .resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize))? .withRenderingMode(.alwaysTemplate) - .resizedImage(to: CGSize(width: CallMessageView.iconSize, height: CallMessageView.iconSize)) ) imageView.tintColor = textColor imageView.contentMode = .center diff --git a/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift b/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift index 437689b35..22393d1a9 100644 --- a/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift +++ b/Session/Conversations/Message Cells/Content Views/DeletedMessageView.swift @@ -27,11 +27,11 @@ final class DeletedMessageView: UIView { private func setUpViewHierarchy(textColor: UIColor) { // Image view let icon = UIImage(named: "ic_trash")? - .withRenderingMode(.alwaysTemplate) .resizedImage(to: CGSize( width: DeletedMessageView.iconSize, height: DeletedMessageView.iconSize - )) + ))? + .withRenderingMode(.alwaysTemplate) let imageView = UIImageView(image: icon) imageView.tintColor = textColor diff --git a/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift b/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift index fbd65d20a..05ad35c01 100644 --- a/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift +++ b/Session/Conversations/Message Cells/Content Views/MediaPlaceholderView.swift @@ -44,13 +44,13 @@ final class MediaPlaceholderView: UIView { // Image view let imageView = UIImageView( image: UIImage(named: iconName)? - .withRenderingMode(.alwaysTemplate) .resizedImage( to: CGSize( width: MediaPlaceholderView.iconSize, height: MediaPlaceholderView.iconSize ) - ) + )? + .withRenderingMode(.alwaysTemplate) ) imageView.tintColor = textColor imageView.contentMode = .center diff --git a/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift b/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift index f95841449..432b34aff 100644 --- a/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift +++ b/Session/Conversations/Message Cells/Content Views/OpenGroupInvitationView.swift @@ -68,8 +68,8 @@ final class OpenGroupInvitationView: UIView { let iconImageViewSize = OpenGroupInvitationView.iconImageViewSize let iconImageView = UIImageView( image: UIImage(named: iconName)? + .resizedImage(to: CGSize(width: iconSize, height: iconSize))? .withRenderingMode(.alwaysTemplate) - .resizedImage(to: CGSize(width: iconSize, height: iconSize)) ) iconImageView.tintColor = .white iconImageView.contentMode = .center diff --git a/Session/Meta/AppDelegate.swift b/Session/Meta/AppDelegate.swift index 41b106584..affde2523 100644 --- a/Session/Meta/AppDelegate.swift +++ b/Session/Meta/AppDelegate.swift @@ -122,6 +122,9 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD /// `appDidFinishLaunching` seems to fix this odd behaviour (even though it doesn't match /// Apple's documentation on the matter) UNUserNotificationCenter.current().delegate = self + + // Resume database + NotificationCenter.default.post(name: Database.resumeNotification, object: self) } func applicationDidEnterBackground(_ application: UIApplication) { @@ -130,6 +133,10 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD // NOTE: Fix an edge case where user taps on the callkit notification // but answers the call on another device stopPollers(shouldStopUserPoller: !self.hasIncomingCallWaiting()) + JobRunner.stopAndClearPendingJobs() + + // Suspend database + NotificationCenter.default.post(name: Database.suspendNotification, object: self) } func applicationDidReceiveMemoryWarning(_ application: UIApplication) { @@ -185,8 +192,16 @@ class AppDelegate: UIResponder, UIApplicationDelegate, UNUserNotificationCenterD // MARK: - Background Fetching func application(_ application: UIApplication, performFetchWithCompletionHandler completionHandler: @escaping (UIBackgroundFetchResult) -> Void) { + // Resume database + NotificationCenter.default.post(name: Database.resumeNotification, object: self) + AppReadiness.runNowOrWhenAppDidBecomeReady { - BackgroundPoller.poll(completionHandler: completionHandler) + BackgroundPoller.poll { result in + // Suspend database + NotificationCenter.default.post(name: Database.suspendNotification, object: self) + + completionHandler(result) + } } } diff --git a/Session/Shared/HighlightMentionBackgroundView.swift b/Session/Shared/HighlightMentionBackgroundView.swift index 1b9a593ba..3a7db4eaa 100644 --- a/Session/Shared/HighlightMentionBackgroundView.swift +++ b/Session/Shared/HighlightMentionBackgroundView.swift @@ -137,9 +137,6 @@ class HighlightMentionBackgroundView: UIView { extraYOffset ) - // We don't want to draw too far to the right - runBounds.size.width = (runBounds.width > lineWidth ? lineWidth : runBounds.width) - let path = UIBezierPath(roundedRect: runBounds, cornerRadius: cornerRadius) mentionBackgroundColor.setFill() path.fill() diff --git a/Session/Utilities/BackgroundPoller.swift b/Session/Utilities/BackgroundPoller.swift index 9a430ec5e..03f575c48 100644 --- a/Session/Utilities/BackgroundPoller.swift +++ b/Session/Utilities/BackgroundPoller.swift @@ -9,8 +9,11 @@ import SessionUtilitiesKit public final class BackgroundPoller { private static var promises: [Promise] = [] + private static var isValid: Bool = false public static func poll(completionHandler: @escaping (UIBackgroundFetchResult) -> Void) { + BackgroundPoller.isValid = true + promises = [] .appending(pollForMessages()) .appending(contentsOf: pollForClosedGroupMessages()) @@ -32,7 +35,11 @@ public final class BackgroundPoller { let poller: OpenGroupAPI.Poller = OpenGroupAPI.Poller(for: server) poller.stop() - return poller.poll(isBackgroundPoll: true, isPostCapabilitiesRetry: false) + return poller.poll( + isBackgroundPoll: true, + isBackgroundPollerValid: { BackgroundPoller.isValid }, + isPostCapabilitiesRetry: false + ) } ) @@ -41,6 +48,7 @@ public final class BackgroundPoller { // after 25 seconds allowing us to cancel all pending promises let cancelTimer: Timer = Timer.scheduledTimerOnMainThread(withTimeInterval: 25, repeats: false) { timer in timer.invalidate() + BackgroundPoller.isValid = false guard promises.contains(where: { !$0.isResolved }) else { return } @@ -50,6 +58,9 @@ public final class BackgroundPoller { when(resolved: promises) .done { _ in + // If we have already invalidated the timer then do nothing (we essentially timed out) + guard cancelTimer.isValid else { return } + cancelTimer.invalidate() completionHandler(.newData) } @@ -88,7 +99,8 @@ public final class BackgroundPoller { groupPublicKey, on: DispatchQueue.main, maxRetryCount: 0, - isBackgroundPoll: true + isBackgroundPoll: true, + isBackgroundPollValid: { BackgroundPoller.isValid } ) } } @@ -100,44 +112,45 @@ public final class BackgroundPoller { return SnodeAPI.getMessages(from: snode, associatedWith: publicKey) .then(on: DispatchQueue.main) { messages -> Promise in - guard !messages.isEmpty else { return Promise.value(()) } + guard !messages.isEmpty, BackgroundPoller.isValid else { return Promise.value(()) } var jobsToRun: [Job] = [] Storage.shared.write { db in - var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] - - messages.forEach { message in - do { - let processedMessage: ProcessedMessage? = try Message.processRawReceivedMessage(db, rawMessage: message) - let key: String = (processedMessage?.threadId ?? Message.nonThreadMessageId) - - threadMessages[key] = (threadMessages[key] ?? []) - .appending(processedMessage?.messageInfo) - } - catch { - switch error { - // Ignore duplicate & selfSend message errors (and don't bother logging - // them as there will be a lot since we each service node duplicates messages) - case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, - MessageReceiverError.duplicateMessage, - MessageReceiverError.duplicateControlMessage, - MessageReceiverError.selfSend: - break + messages + .compactMap { message -> ProcessedMessage? in + do { + return try Message.processRawReceivedMessage(db, rawMessage: message) + } + catch { + switch error { + // Ignore duplicate & selfSend message errors (and don't bother + // logging them as there will be a lot since we each service node + // duplicates messages) + case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, + MessageReceiverError.duplicateMessage, + MessageReceiverError.duplicateControlMessage, + MessageReceiverError.selfSend: + break + + // In the background ignore 'SQLITE_ABORT' (it generally means + // the BackgroundPoller has timed out + case DatabaseError.SQLITE_ABORT: break + + default: SNLog("Failed to deserialize envelope due to error: \(error).") + } - default: SNLog("Failed to deserialize envelope due to error: \(error).") + return nil } } - } - - threadMessages + .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } .forEach { threadId, threadMessages in let maybeJob: Job? = Job( variant: .messageReceive, behaviour: .runOnce, threadId: threadId, details: MessageReceiveJob.Details( - messages: threadMessages, + messages: threadMessages.map { $0.messageInfo }, isBackgroundPoll: true ) ) diff --git a/SessionMessagingKit/Database/Models/Capability.swift b/SessionMessagingKit/Database/Models/Capability.swift index 60437fa23..9feda3eb1 100644 --- a/SessionMessagingKit/Database/Models/Capability.swift +++ b/SessionMessagingKit/Database/Models/Capability.swift @@ -59,3 +59,37 @@ public struct Capability: Codable, FetchableRecord, PersistableRecord, TableReco self.isMissing = isMissing } } + +extension Capability.Variant { + // MARK: - Codable + + public init(from decoder: Decoder) throws { + let container: SingleValueDecodingContainer = try decoder.singleValueContainer() + let valueString: String = try container.decode(String.self) + + // FIXME: Remove this code + // There was a point where we didn't have custom Codable handling for the Capability.Variant + // which resulted in the data being encoded into the database as a JSON dict - this code catches + // that case and extracts the standard string value so it can be processed the same as the + // "proper" custom Codable logic) + if valueString.starts(with: "{") { + self = Capability.Variant( + from: valueString + .replacingOccurrences(of: "\":{}}", with: "") + .replacingOccurrences(of: "\"}}", with: "") + .replacingOccurrences(of: "{\"unsupported\":{\"_0\":\"", with: "") + .replacingOccurrences(of: "{\"", with: "") + ) + return + } + // FIXME: Remove this code ^^^ + + self = Capability.Variant(from: valueString) + } + + public func encode(to encoder: Encoder) throws { + var container: SingleValueEncodingContainer = encoder.singleValueContainer() + + try container.encode(rawValue) + } +} diff --git a/SessionMessagingKit/Open Groups/Models/Capabilities.swift b/SessionMessagingKit/Open Groups/Models/Capabilities.swift index 4c4332485..2947214b8 100644 --- a/SessionMessagingKit/Open Groups/Models/Capabilities.swift +++ b/SessionMessagingKit/Open Groups/Models/Capabilities.swift @@ -4,60 +4,14 @@ import Foundation extension OpenGroupAPI { public struct Capabilities: Codable, Equatable { - public enum Capability: Equatable, CaseIterable, Codable { - public static var allCases: [Capability] { - [.sogs, .blind] - } - - case sogs - case blind - - /// Fallback case if the capability isn't supported by this version of the app - case unsupported(String) - - // MARK: - Convenience - - public var rawValue: String { - switch self { - case .unsupported(let originalValue): return originalValue - default: return "\(self)" - } - } - - // MARK: - Initialization - - public init(from valueString: String) { - let maybeValue: Capability? = Capability.allCases.first { $0.rawValue == valueString } - - self = (maybeValue ?? .unsupported(valueString)) - } - } - - public let capabilities: [Capability] - public let missing: [Capability]? + public let capabilities: [Capability.Variant] + public let missing: [Capability.Variant]? // MARK: - Initialization - public init(capabilities: [Capability], missing: [Capability]? = nil) { + public init(capabilities: [Capability.Variant], missing: [Capability.Variant]? = nil) { self.capabilities = capabilities self.missing = missing } } } - -extension OpenGroupAPI.Capabilities.Capability { - // MARK: - Codable - - public init(from decoder: Decoder) throws { - let container: SingleValueDecodingContainer = try decoder.singleValueContainer() - let valueString: String = try container.decode(String.self) - - self = OpenGroupAPI.Capabilities.Capability(from: valueString) - } - - public func encode(to encoder: Encoder) throws { - var container: SingleValueEncodingContainer = encoder.singleValueContainer() - - try container.encode(rawValue) - } -} diff --git a/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift b/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift index f668454bb..f566565f7 100644 --- a/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift +++ b/SessionMessagingKit/Open Groups/Models/SOGSMessage.swift @@ -10,6 +10,7 @@ extension OpenGroupAPI { case sender = "session_id" case posted case edited + case deleted case seqNo = "seqno" case whisper case whisperMods = "whisper_mods" @@ -23,6 +24,7 @@ extension OpenGroupAPI { public let sender: String? public let posted: TimeInterval public let edited: TimeInterval? + public let deleted: Bool? public let seqNo: Int64 public let whisper: Bool public let whisperMods: Bool @@ -79,6 +81,7 @@ extension OpenGroupAPI.Message { sender: try? container.decode(String.self, forKey: .sender), posted: try container.decode(TimeInterval.self, forKey: .posted), edited: try? container.decode(TimeInterval.self, forKey: .edited), + deleted: try? container.decode(Bool.self, forKey: .deleted), seqNo: try container.decode(Int64.self, forKey: .seqNo), whisper: ((try? container.decode(Bool.self, forKey: .whisper)) ?? false), whisperMods: ((try? container.decode(Bool.self, forKey: .whisperMods)) ?? false), diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index c72dea525..ea750d7cd 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -348,7 +348,7 @@ public final class OpenGroupManager: NSObject { capabilities.capabilities.forEach { capability in _ = try? Capability( openGroupServer: server.lowercased(), - variant: Capability.Variant(from: capability.rawValue), + variant: capability, isMissing: false ) .saved(db) @@ -356,7 +356,7 @@ public final class OpenGroupManager: NSObject { capabilities.missing?.forEach { capability in _ = try? Capability( openGroupServer: server.lowercased(), - variant: Capability.Variant(from: capability.rawValue), + variant: capability, isMissing: true ) .saved(db) @@ -499,9 +499,12 @@ public final class OpenGroupManager: NSObject { } let sortedMessages: [OpenGroupAPI.Message] = messages + .filter { $0.deleted != true } .sorted { lhs, rhs in lhs.id < rhs.id } + let messageServerIdsToRemove: [Int64] = messages + .filter { $0.deleted == true } + .map { $0.id } let seqNo: Int64? = sortedMessages.map { $0.seqNo }.max() - var messageServerIdsToRemove: [UInt64] = [] // Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId') if let seqNo: Int64 = seqNo { @@ -515,11 +518,7 @@ public final class OpenGroupManager: NSObject { guard let base64EncodedString: String = message.base64EncodedData, let data = Data(base64Encoded: base64EncodedString) - else { - // A message with no data has been deleted so add it to the list to remove - messageServerIdsToRemove.append(UInt64(message.id)) - return - } + else { return } do { let processedMessage: ProcessedMessage? = try Message.processReceivedOpenGroupMessage( diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift index d875c0f0e..46807cf60 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+TypingIndicators.swift @@ -13,8 +13,7 @@ extension MessageReceiver { switch message.kind { case .started: - TypingIndicators.didStartTyping( - db, + let needsToStartTypingIndicator: Bool = TypingIndicators.didStartTypingNeedsToStart( threadId: thread.id, threadVariant: thread.variant, threadIsMessageRequest: thread.isMessageRequest(db), @@ -22,6 +21,10 @@ extension MessageReceiver { timestampMs: message.sentTimestamp.map { Int64($0) } ) + if needsToStartTypingIndicator { + TypingIndicators.start(db, threadId: thread.id, direction: .incoming) + } + case .stopped: TypingIndicators.didStopTyping(db, threadId: thread.id, direction: .incoming) diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index c20391a42..438ffa55f 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -291,7 +291,7 @@ public final class MessageSender { errorCount += 1 guard errorCount == promiseCount else { return } // Only error out if all promises failed - Storage.shared.write { db in + Storage.shared.read { db in handleFailure(db, with: .other(error)) } } @@ -300,7 +300,7 @@ public final class MessageSender { .catch(on: DispatchQueue.global(qos: .default)) { error in SNLog("Couldn't send message due to error: \(error).") - Storage.shared.write { db in + Storage.shared.read { db in handleFailure(db, with: .other(error)) } } @@ -447,7 +447,7 @@ public final class MessageSender { } } .catch(on: DispatchQueue.global(qos: .default)) { error in - dependencies.storage.write { db in + dependencies.storage.read { db in handleFailure(db, with: .other(error)) } } @@ -557,7 +557,7 @@ public final class MessageSender { } } .catch(on: DispatchQueue.global(qos: .default)) { error in - dependencies.storage.write { db in + dependencies.storage.read { db in handleFailure(db, with: .other(error)) } } @@ -652,15 +652,34 @@ public final class MessageSender { with error: MessageSenderError, interactionId: Int64? ) { - // Mark any "sending" recipients as "failed" - _ = try? RecipientState + // Check if we need to mark any "sending" recipients as "failed" + // + // Note: The 'db' could be either read-only or writeable so we determine + // if a change is required, and if so dispatch to a separate queue for the + // actual write + let rowIds: [Int64] = (try? RecipientState + .select(Column.rowID) .filter(RecipientState.Columns.interactionId == interactionId) .filter(RecipientState.Columns.state == RecipientState.State.sending) - .updateAll( - db, - RecipientState.Columns.state.set(to: RecipientState.State.failed), - RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription) - ) + .asRequest(of: Int64.self) + .fetchAll(db)) + .defaulting(to: []) + + guard !rowIds.isEmpty else { return } + + // Need to dispatch to a different thread to prevent a potential db re-entrancy + // issue from occuring in some cases + DispatchQueue.global(qos: .background).async { + Storage.shared.write { db in + try RecipientState + .filter(rowIds.contains(Column.rowID)) + .updateAll( + db, + RecipientState.Columns.state.set(to: RecipientState.State.failed), + RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription) + ) + } + } } // MARK: - Convenience diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift index f747a1cc2..ad5c8cda5 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/ClosedGroupPoller.swift @@ -152,6 +152,7 @@ public final class ClosedGroupPoller { on queue: DispatchQueue = SessionSnodeKit.Threading.workQueue, maxRetryCount: UInt = 0, isBackgroundPoll: Bool = false, + isBackgroundPollValid: @escaping (() -> Bool) = { true }, poller: ClosedGroupPoller? = nil ) -> Promise { let promise: Promise = SnodeAPI.getSwarm(for: groupPublicKey) @@ -160,9 +161,10 @@ public final class ClosedGroupPoller { guard let snode = swarm.randomElement() else { return Promise(error: Error.insufficientSnodes) } return attempt(maxRetryCount: maxRetryCount, recoveringOn: queue) { - guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { - return Promise(error: Error.pollingCanceled) - } + guard + (isBackgroundPoll && isBackgroundPollValid()) || + poller?.isPolling.wrappedValue[groupPublicKey] == true + else { return Promise(error: Error.pollingCanceled) } let promises: [Promise<[SnodeReceivedMessage]>] = { if SnodeAPI.hardfork >= 19 && SnodeAPI.softfork >= 1 { @@ -181,9 +183,13 @@ public final class ClosedGroupPoller { return when(resolved: promises) .then(on: queue) { messageResults -> Promise in - guard isBackgroundPoll || poller?.isPolling.wrappedValue[groupPublicKey] == true else { return Promise.value(()) } + guard + (isBackgroundPoll && isBackgroundPollValid()) || + poller?.isPolling.wrappedValue[groupPublicKey] == true + else { return Promise.value(()) } var promises: [Promise] = [] + var jobToRun: Job? = nil let allMessages: [SnodeReceivedMessage] = messageResults .reduce([]) { result, next in switch next { @@ -192,8 +198,16 @@ public final class ClosedGroupPoller { } } var messageCount: Int = 0 - let totalMessagesCount: Int = allMessages.count + // No need to do anything if there are no messages + guard !allMessages.isEmpty else { + if !isBackgroundPoll { + SNLog("Received no new messages in closed group with public key: \(groupPublicKey)") + } + return Promise.value(()) + } + + // Otherwise process the messages and add them to the queue for handling Storage.shared.write { db in let processedMessages: [ProcessedMessage] = allMessages .compactMap { message -> ProcessedMessage? in @@ -209,6 +223,14 @@ public final class ClosedGroupPoller { MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break + + // In the background ignore 'SQLITE_ABORT' (it generally means + // the BackgroundPoller has timed out + case DatabaseError.SQLITE_ABORT: + guard !isBackgroundPoll else { break } + + SNLog("Failed to the database being suspended (running in background with no background task).") + break default: SNLog("Failed to deserialize envelope due to error: \(error).") } @@ -219,7 +241,7 @@ public final class ClosedGroupPoller { messageCount = processedMessages.count - let jobToRun: Job? = Job( + jobToRun = Job( variant: .messageReceive, behaviour: .runOnce, threadId: groupPublicKey, @@ -232,35 +254,29 @@ public final class ClosedGroupPoller { // If we are force-polling then add to the JobRunner so they are persistent and will retry on // the next app run if they fail but don't let them auto-start JobRunner.add(db, job: jobToRun, canStartJob: !isBackgroundPoll) - - // We want to try to handle the receive jobs immediately in the background - if isBackgroundPoll { - promises = promises.appending( - jobToRun.map { job -> Promise in - let (promise, seal) = Promise.pending() - - // Note: In the background we just want jobs to fail silently - MessageReceiveJob.run( - job, - queue: queue, - success: { _, _ in seal.fulfill(()) }, - failure: { _, _, _ in seal.fulfill(()) }, - deferred: { _ in seal.fulfill(()) } - ) - - return promise - } - ) - } } - if !isBackgroundPoll { - if totalMessagesCount > 0 { - SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(totalMessagesCount - messageCount))") - } - else { - SNLog("Received no new messages in closed group with public key: \(groupPublicKey)") - } + if isBackgroundPoll { + // We want to try to handle the receive jobs immediately in the background + promises = promises.appending( + jobToRun.map { job -> Promise in + let (promise, seal) = Promise.pending() + + // Note: In the background we just want jobs to fail silently + MessageReceiveJob.run( + job, + queue: queue, + success: { _, _ in seal.fulfill(()) }, + failure: { _, _, _ in seal.fulfill(()) }, + deferred: { _ in seal.fulfill(()) } + ) + + return promise + } + ) + } + else { + SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") in closed group with public key: \(groupPublicKey) (duplicates: \(allMessages.count - messageCount))") } return when(fulfilled: promises) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift index 52c2714fd..fc4b15eb1 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift @@ -8,6 +8,8 @@ import SessionUtilitiesKit extension OpenGroupAPI { public final class Poller { + typealias PollResponse = [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)] + private let server: String private var timer: Timer? = nil private var hasStarted = false @@ -71,6 +73,7 @@ extension OpenGroupAPI { @discardableResult public func poll( isBackgroundPoll: Bool, + isBackgroundPollerValid: @escaping (() -> Bool) = { true }, isPostCapabilitiesRetry: Bool, using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies() ) -> Promise { @@ -83,8 +86,14 @@ extension OpenGroupAPI { Threading.pollerQueue.async { dependencies.storage - .read { db in - OpenGroupAPI + .read { db -> Promise<(Int64, PollResponse)> in + let failureCount: Int64 = (try? OpenGroup + .select(max(OpenGroup.Columns.pollFailureCount)) + .asRequest(of: Int64.self) + .fetchOne(db)) + .defaulting(to: 0) + + return OpenGroupAPI .poll( db, server: server, @@ -95,10 +104,24 @@ extension OpenGroupAPI { ), using: dependencies ) + .map(on: OpenGroupAPI.workQueue) { (failureCount, $0) } } - .done(on: OpenGroupAPI.workQueue) { [weak self] response in + .done(on: OpenGroupAPI.workQueue) { [weak self] failureCount, response in + guard !isBackgroundPoll || isBackgroundPollerValid() else { + // If this was a background poll and the background poll is no longer valid + // then just stop + self?.isPolling = false + seal.fulfill(()) + return + } + self?.isPolling = false - self?.handlePollResponse(response, isBackgroundPoll: isBackgroundPoll, using: dependencies) + self?.handlePollResponse( + response, + failureCount: failureCount, + isBackgroundPoll: isBackgroundPoll, + using: dependencies + ) dependencies.mutableCache.mutate { cache in cache.hasPerformedInitialPoll[server] = true @@ -106,17 +129,18 @@ extension OpenGroupAPI { UserDefaults.standard[.lastOpen] = Date() } - // Reset the failure count - Storage.shared.writeAsync { db in - try OpenGroup - .filter(OpenGroup.Columns.server == server) - .updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0)) - } - SNLog("Open group polling finished for \(server).") seal.fulfill(()) } .catch(on: OpenGroupAPI.workQueue) { [weak self] error in + guard !isBackgroundPoll || isBackgroundPollerValid() else { + // If this was a background poll and the background poll is no longer valid + // then just stop + self?.isPolling = false + seal.fulfill(()) + return + } + // If we are retrying then the error is being handled so no need to continue (this // method will always resolve) self?.updateCapabilitiesAndRetryIfNeeded( @@ -141,7 +165,10 @@ extension OpenGroupAPI { Storage.shared.writeAsync { db in try OpenGroup .filter(OpenGroup.Columns.server == server) - .updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1))) + .updateAll( + db, + OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1)) + ) } SNLog("Open group polling failed due to error: \(error). Setting failure count to \(pollFailureCount).") @@ -221,18 +248,166 @@ extension OpenGroupAPI { return promise } - private func handlePollResponse(_ response: [OpenGroupAPI.Endpoint: (info: OnionRequestResponseInfoType, data: Codable?)], isBackgroundPoll: Bool, using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) { + private func handlePollResponse( + _ response: PollResponse, + failureCount: Int64, + isBackgroundPoll: Bool, + using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies() + ) { let server: String = self.server - - dependencies.storage.write { db in - try response.forEach { endpoint, endpointResponse in + let validResponses: PollResponse = response + .filter { endpoint, endpointResponse in switch endpoint { case .capabilities: - guard let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, let responseBody: Capabilities = responseData.body else { + guard (endpointResponse.data as? BatchSubResponse)?.body != nil else { SNLog("Open group polling failed due to invalid capability data.") - return + return false } + return true + + case .roomPollInfo(let roomToken, _): + guard (endpointResponse.data as? BatchSubResponse)?.body != nil else { + switch (endpointResponse.data as? BatchSubResponse)?.code { + case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.") + default: SNLog("Open group polling failed due to invalid room info data.") + } + return false + } + + return true + + case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _): + guard + let responseData: BatchSubResponse<[Failable]> = endpointResponse.data as? BatchSubResponse<[Failable]>, + let responseBody: [Failable] = responseData.body + else { + switch (endpointResponse.data as? BatchSubResponse<[Failable]>)?.code { + case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.") + default: SNLog("Open group polling failed due to invalid messages data.") + } + return false + } + + let successfulMessages: [Message] = responseBody.compactMap { $0.value } + + if successfulMessages.count != responseBody.count { + let droppedCount: Int = (responseBody.count - successfulMessages.count) + + SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").") + } + + return !successfulMessages.isEmpty + + case .inbox, .inboxSince, .outbox, .outboxSince: + guard + let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, + !responseData.failedToParseBody + else { + SNLog("Open group polling failed due to invalid inbox/outbox data.") + return false + } + + // Double optional because the server can return a `304` with an empty body + let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? []) + + return !messages.isEmpty + + default: return false // No custom handling needed + } + } + + // If there are no remaining 'validResponses' and there hasn't been a failure then there is + // no need to do anything else + guard !validResponses.isEmpty || failureCount != 0 else { return } + + // Retrieve the current capability & group info to check if anything changed + let rooms: [String] = validResponses + .keys + .compactMap { endpoint -> String? in + switch endpoint { + case .roomPollInfo(let roomToken, _): return roomToken + default: return nil + } + } + let currentInfo: (capabilities: Capabilities, groups: [OpenGroup])? = dependencies.storage.read { db in + let allCapabilities: [Capability] = try Capability + .filter(Capability.Columns.openGroupServer == server) + .fetchAll(db) + let capabilities: Capabilities = Capabilities( + capabilities: allCapabilities + .filter { !$0.isMissing } + .map { $0.variant }, + missing: { + let missingCapabilities: [Capability.Variant] = allCapabilities + .filter { $0.isMissing } + .map { $0.variant } + + return (missingCapabilities.isEmpty ? nil : missingCapabilities) + }() + ) + let openGroupIds: [String] = rooms + .map { OpenGroup.idFor(roomToken: $0, server: server) } + let groups: [OpenGroup] = try OpenGroup + .filter(ids: openGroupIds) + .fetchAll(db) + + return (capabilities, groups) + } + let changedResponses: PollResponse = validResponses + .filter { endpoint, endpointResponse in + switch endpoint { + case .capabilities: + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: Capabilities = responseData.body + else { return false } + + return (responseBody != currentInfo?.capabilities) + + case .roomPollInfo(let roomToken, _): + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: RoomPollInfo = responseData.body + else { return false } + guard let existingOpenGroup: OpenGroup = currentInfo?.groups.first(where: { $0.roomToken == roomToken }) else { + return true + } + + // Note: This might need to be updated in the future when we start tracking + // user permissions if changes to permissions don't trigger a change to + // the 'infoUpdates' + return ( + responseBody.activeUsers != existingOpenGroup.userCount || ( + responseBody.details != nil && + responseBody.details?.infoUpdates != existingOpenGroup.infoUpdates + ) + ) + + default: return true + } + } + + // If there are no 'changedResponses' and there hasn't been a failure then there is + // no need to do anything else + guard !changedResponses.isEmpty || failureCount != 0 else { return } + + dependencies.storage.write { db in + // Reset the failure count + if failureCount > 0 { + try OpenGroup + .filter(OpenGroup.Columns.server == server) + .updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0)) + } + + try changedResponses.forEach { endpoint, endpointResponse in + switch endpoint { + case .capabilities: + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: Capabilities = responseData.body + else { return } + OpenGroupManager.handleCapabilities( db, capabilities: responseBody, @@ -240,13 +415,10 @@ extension OpenGroupAPI { ) case .roomPollInfo(let roomToken, _): - guard let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, let responseBody: RoomPollInfo = responseData.body else { - switch (endpointResponse.data as? BatchSubResponse)?.code { - case 404: SNLog("Open group polling failed to retrieve info for unknown room '\(roomToken)'.") - default: SNLog("Open group polling failed due to invalid room info data.") - } - return - } + guard + let responseData: BatchSubResponse = endpointResponse.data as? BatchSubResponse, + let responseBody: RoomPollInfo = responseData.body + else { return } try OpenGroupManager.handlePollInfo( db, @@ -258,24 +430,14 @@ extension OpenGroupAPI { ) case .roomMessagesRecent(let roomToken), .roomMessagesBefore(let roomToken, _), .roomMessagesSince(let roomToken, _): - guard let responseData: BatchSubResponse<[Failable]> = endpointResponse.data as? BatchSubResponse<[Failable]>, let responseBody: [Failable] = responseData.body else { - switch (endpointResponse.data as? BatchSubResponse<[Failable]>)?.code { - case 404: SNLog("Open group polling failed to retrieve messages for unknown room '\(roomToken)'.") - default: SNLog("Open group polling failed due to invalid messages data.") - } - return - } - let successfulMessages: [Message] = responseBody.compactMap { $0.value } - - if successfulMessages.count != responseBody.count { - let droppedCount: Int = (responseBody.count - successfulMessages.count) - - SNLog("Dropped \(droppedCount) invalid open group message\(droppedCount == 1 ? "" : "s").") - } + guard + let responseData: BatchSubResponse<[Failable]> = endpointResponse.data as? BatchSubResponse<[Failable]>, + let responseBody: [Failable] = responseData.body + else { return } OpenGroupManager.handleMessages( db, - messages: successfulMessages, + messages: responseBody.compactMap { $0.value }, for: roomToken, on: server, isBackgroundPoll: isBackgroundPoll, @@ -283,10 +445,10 @@ extension OpenGroupAPI { ) case .inbox, .inboxSince, .outbox, .outboxSince: - guard let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, !responseData.failedToParseBody else { - SNLog("Open group polling failed due to invalid inbox/outbox data.") - return - } + guard + let responseData: BatchSubResponse<[DirectMessage]?> = endpointResponse.data as? BatchSubResponse<[DirectMessage]?>, + !responseData.failedToParseBody + else { return } // Double optional because the server can return a `304` with an empty body let messages: [OpenGroupAPI.DirectMessage] = ((responseData.body ?? []) ?? []) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift index 9e2c0d310..4877077d2 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/Poller.swift @@ -150,6 +150,10 @@ public final class Poller { MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break + + case DatabaseError.SQLITE_ABORT: + SNLog("Failed to the database being suspended (running in background with no background task).") + break default: SNLog("Failed to deserialize envelope due to error: \(error).") } diff --git a/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift b/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift index d2d99806e..cdda8b025 100644 --- a/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift +++ b/SessionMessagingKit/Sending & Receiving/Typing Indicators/TypingIndicators.swift @@ -44,10 +44,7 @@ public class TypingIndicators { self.timestampMs = (timestampMs ?? Int64(floor(Date().timeIntervalSince1970 * 1000))) } - fileprivate func starting(_ db: Database) -> Indicator { - let direction: Direction = self.direction - let timestampMs: Int64 = self.timestampMs - + fileprivate func start(_ db: Database) { // Start the typing indicator switch direction { case .outgoing: @@ -55,27 +52,17 @@ public class TypingIndicators { case .incoming: try? ThreadTypingIndicator( - threadId: self.threadId, + threadId: threadId, timestampMs: timestampMs ) .save(db) } - // Schedule the 'stopCallback' to cancel the typing indicator - stopTimer?.invalidate() - stopTimer = Timer.scheduledTimerOnMainThread( - withTimeInterval: (direction == .outgoing ? 3 : 5), - repeats: false - ) { [weak self] _ in - Storage.shared.write { db in - self?.stoping(db) - } - } - - return self + // Refresh the timeout since we just started + refreshTimeout() } - @discardableResult fileprivate func stoping(_ db: Database) -> Indicator? { + fileprivate func stop(_ db: Database) { self.refreshTimer?.invalidate() self.refreshTimer = nil self.stopTimer?.invalidate() @@ -84,7 +71,7 @@ public class TypingIndicators { switch direction { case .outgoing: guard let thread: SessionThread = try? SessionThread.fetchOne(db, id: self.threadId) else { - return nil + return } try? MessageSender.send( @@ -99,8 +86,22 @@ public class TypingIndicators { .filter(ThreadTypingIndicator.Columns.threadId == self.threadId) .deleteAll(db) } + } + + fileprivate func refreshTimeout() { + let threadId: String = self.threadId + let direction: Direction = self.direction - return nil + // Schedule the 'stopCallback' to cancel the typing indicator + stopTimer?.invalidate() + stopTimer = Timer.scheduledTimerOnMainThread( + withTimeInterval: (direction == .outgoing ? 3 : 5), + repeats: false + ) { _ in + Storage.shared.write { db in + TypingIndicators.didStopTyping(db, threadId: threadId, direction: direction) + } + } } private func scheduleRefreshCallback(_ db: Database, shouldSend: Bool = true) { @@ -138,56 +139,76 @@ public class TypingIndicators { // MARK: - Functions - public static func didStartTyping( - _ db: Database, + public static func didStartTypingNeedsToStart( threadId: String, threadVariant: SessionThread.Variant, threadIsMessageRequest: Bool, direction: Direction, timestampMs: Int64? - ) { + ) -> Bool { switch direction { case .outgoing: - let updatedIndicator: Indicator? = ( - outgoing.wrappedValue[threadId] ?? - Indicator( - threadId: threadId, - threadVariant: threadVariant, - threadIsMessageRequest: threadIsMessageRequest, - direction: direction, - timestampMs: timestampMs - ) - )?.starting(db) + // If we already have an existing typing indicator for this thread then just + // refresh it's timeout (no need to do anything else) + if let existingIndicator: Indicator = outgoing.wrappedValue[threadId] { + existingIndicator.refreshTimeout() + return false + } - outgoing.mutate { $0[threadId] = updatedIndicator } + let newIndicator: Indicator? = Indicator( + threadId: threadId, + threadVariant: threadVariant, + threadIsMessageRequest: threadIsMessageRequest, + direction: direction, + timestampMs: timestampMs + ) + newIndicator?.refreshTimeout() + + outgoing.mutate { $0[threadId] = newIndicator } + return true case .incoming: - let updatedIndicator: Indicator? = ( - incoming.wrappedValue[threadId] ?? - Indicator( - threadId: threadId, - threadVariant: threadVariant, - threadIsMessageRequest: threadIsMessageRequest, - direction: direction, - timestampMs: timestampMs - ) - )?.starting(db) + // If we already have an existing typing indicator for this thread then just + // refresh it's timeout (no need to do anything else) + if let existingIndicator: Indicator = incoming.wrappedValue[threadId] { + existingIndicator.refreshTimeout() + return false + } - incoming.mutate { $0[threadId] = updatedIndicator } + let newIndicator: Indicator? = Indicator( + threadId: threadId, + threadVariant: threadVariant, + threadIsMessageRequest: threadIsMessageRequest, + direction: direction, + timestampMs: timestampMs + ) + newIndicator?.refreshTimeout() + + incoming.mutate { $0[threadId] = newIndicator } + return true + } + } + + public static func start(_ db: Database, threadId: String, direction: Direction) { + switch direction { + case .outgoing: outgoing.wrappedValue[threadId]?.start(db) + case .incoming: incoming.wrappedValue[threadId]?.start(db) } } public static func didStopTyping(_ db: Database, threadId: String, direction: Direction) { switch direction { case .outgoing: - let updatedIndicator: Indicator? = outgoing.wrappedValue[threadId]?.stoping(db) - - outgoing.mutate { $0[threadId] = updatedIndicator } + if let indicator: Indicator = outgoing.wrappedValue[threadId] { + indicator.stop(db) + outgoing.mutate { $0[threadId] = nil } + } case .incoming: - let updatedIndicator: Indicator? = incoming.wrappedValue[threadId]?.stoping(db) - - incoming.mutate { $0[threadId] = updatedIndicator } + if let indicator: Indicator = incoming.wrappedValue[threadId] { + indicator.stop(db) + incoming.mutate { $0[threadId] = nil } + } } } } diff --git a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift index e9b5fbfbb..16b66672d 100644 --- a/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift +++ b/SessionSnodeKit/Database/Models/SnodeReceivedMessageInfo.swift @@ -68,19 +68,34 @@ public extension SnodeReceivedMessageInfo { public extension SnodeReceivedMessageInfo { static func pruneExpiredMessageHashInfo(for snode: Snode, namespace: Int, associatedWith publicKey: String) { - // Delete any expired SnodeReceivedMessageInfo values associated to a specific node + // Delete any expired SnodeReceivedMessageInfo values associated to a specific node (even though + // this runs very quickly we fetch the rowIds we want to delete from a 'read' call to avoid + // blocking the write queue since this method is called very frequently) + let rowIds: [Int64] = Storage.shared + .read { db in + // Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want + // to clear out the legacy hashes) + let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo + .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) + .isNotEmpty(db) + + guard hasNonLegacyHash else { return [] } + + return try SnodeReceivedMessageInfo + .select(Column.rowID) + .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) + .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000)) + .asRequest(of: Int64.self) + .fetchAll(db) + } + .defaulting(to: []) + + // If there are no rowIds to delete then do nothing + guard !rowIds.isEmpty else { return } + Storage.shared.write { db in - // Only prune the hashes if new hashes exist for this Snode (if they don't then we don't want - // to clear out the legacy hashes) - let hasNonLegacyHash: Bool = try SnodeReceivedMessageInfo - .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) - .isNotEmpty(db) - - guard hasNonLegacyHash else { return } - try SnodeReceivedMessageInfo - .filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace)) - .filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (Date().timeIntervalSince1970 * 1000)) + .filter(rowIds.contains(Column.rowID)) .deleteAll(db) } } diff --git a/SessionUtilitiesKit/Database/Storage.swift b/SessionUtilitiesKit/Database/Storage.swift index 87e283918..65d515f4b 100644 --- a/SessionUtilitiesKit/Database/Storage.swift +++ b/SessionUtilitiesKit/Database/Storage.swift @@ -60,6 +60,7 @@ public final class Storage { // Configure the database and create the DatabasePool for interacting with the database var config = Configuration() config.maximumReaderCount = 10 // Increase the max read connection limit - Default is 5 + config.observesSuspensionNotifications = true // Minimise `0xDEAD10CC` exceptions config.prepareDatabase { db in var keySpec: Data = Storage.getOrGenerateDatabaseKeySpec() defer { keySpec.resetBytes(in: 0.. Value? { + guard let key: Key = key else { return nil } + + return self[key] + } + func setting(_ key: Key?, _ value: Value?) -> [Key: Value] { guard let key: Key = key else { return self } diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 0c1950947..c84a11283 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -126,6 +126,9 @@ public final class JobRunner { queues.mutate { $0[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) } + // Don't start the queue if the job can't be started + guard canStartJob else { return } + // Start the job runner if needed db.afterNextTransactionCommit { _ in queues.wrappedValue[updatedJob.variant]?.start() @@ -253,6 +256,15 @@ public final class JobRunner { JobRunner.hasCompletedInitialBecomeActive.mutate { $0 = true } } + /// Calling this will clear the JobRunner queues and stop it from running new jobs, any currently executing jobs will continue to run + /// though (this means if we suspend the database it's likely that any currently running jobs will fail to complete and fail to record their + /// failure - they _should_ be picked up again the next time the app is launched) + public static func stopAndClearPendingJobs() { + queues.wrappedValue.values.forEach { queue in + queue.stopAndClearPendingJobs() + } + } + public static func isCurrentlyRunning(_ job: Job?) -> Bool { guard let job: Job = job, let jobId: Int64 = job.id else { return false } @@ -347,6 +359,8 @@ private final class JobQueue { } } + private static let deferralLoopThreshold: Int = 3 + private let type: QueueType private let executionType: ExecutionType private let qosClass: DispatchQoS @@ -376,6 +390,7 @@ private final class JobQueue { private var queue: Atomic<[Job]> = Atomic([]) private var jobsCurrentlyRunning: Atomic> = Atomic([]) private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) + private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } @@ -555,7 +570,16 @@ private final class JobQueue { runNextJob() } + fileprivate func stopAndClearPendingJobs() { + isRunning.mutate { $0 = false } + queue.mutate { $0 = [] } + deferLoopTracker.mutate { $0 = [:] } + } + private func runNextJob() { + // Ensure the queue is running (if we've stopped the queue then we shouldn't start the next job) + guard isRunning.wrappedValue else { return } + // Ensure this is running on the correct queue guard DispatchQueue.getSpecific(key: queueKey) == queueContext else { internalQueue.async { [weak self] in @@ -652,7 +676,7 @@ private final class JobQueue { return } - // Update the state to indicate it's running + // Update the state to indicate the particular job is running // // Note: We need to store 'numJobsRemaining' in it's own variable because // the 'SNLog' seems to dispatch to it's own queue which ends up getting @@ -662,7 +686,6 @@ private final class JobQueue { trigger?.invalidate() // Need to invalidate to prevent a memory leak trigger = nil } - isRunning.mutate { $0 = true } jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) numJobsRunning = jobsCurrentlyRunning.count @@ -779,13 +802,20 @@ private final class JobQueue { // `failureCount` and `nextRunTimestamp` to prevent them from endlessly running over // and over and reset their retry backoff in case they fail next time case .recurringOnLaunch, .recurringOnActive: - Storage.shared.write { db in - _ = try job - .with( - failureCount: 0, - nextRunTimestamp: 0 - ) - .saved(db) + if + let jobId: Int64 = job.id, + job.failureCount != 0 && + job.nextRunTimestamp > TimeInterval.leastNonzeroMagnitude + { + Storage.shared.write { db in + _ = try Job + .filter(id: jobId) + .updateAll( + db, + Job.Columns.failureCount.set(to: 0), + Job.Columns.nextRunTimestamp.set(to: 0) + ) + } } default: break @@ -927,8 +957,48 @@ private final class JobQueue { /// This function is called when a job neither succeeds or fails (this should only occur if the job has specific logic that makes it dependant /// on other jobs, and it should automatically manage those dependencies) private func handleJobDeferred(_ job: Job) { + var stuckInDeferLoop: Bool = false jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + deferLoopTracker.mutate { + guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else { + $0 = $0.setting( + job.id, + (1, [Date().timeIntervalSince1970]) + ) + return + } + + let timeNow: TimeInterval = Date().timeIntervalSince1970 + stuckInDeferLoop = ( + lastRecord.count >= JobQueue.deferralLoopThreshold && + (timeNow - lastRecord.times[0]) < CGFloat(lastRecord.count) + ) + + $0 = $0.setting( + job.id, + ( + lastRecord.count + 1, + // Only store the last 'deferralLoopThreshold' times to ensure we aren't running faster + // than one loop per second + lastRecord.times.suffix(JobQueue.deferralLoopThreshold - 1) + [timeNow] + ) + ) + } + + // It's possible (by introducing bugs) to create a loop where a Job tries to run and immediately + // defers itself but then attempts to run again (resulting in an infinite loop); this won't block + // the app since it's on a background thread but can result in 100% of a CPU being used (and a + // battery drain) + // + // This code will maintain an in-memory store for any jobs which are deferred too quickly (ie. + // more than 'deferralLoopThreshold' times within 'deferralLoopThreshold' seconds) + guard !stuckInDeferLoop else { + deferLoopTracker.mutate { $0 = $0.removingValue(forKey: job.id) } + handleJobFailed(job, error: JobRunnerError.possibleDeferralLoop, permanentFailure: false) + return + } + internalQueue.async { [weak self] in self?.runNextJob() } diff --git a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift index 15e2b23a2..8d015095d 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunnerError.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunnerError.swift @@ -11,4 +11,6 @@ public enum JobRunnerError: Error { case missingRequiredDetails case missingDependencies + + case possibleDeferralLoop } From 3f63a44c319170c6330108063c39957b7ac7fab0 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Fri, 5 Aug 2022 17:11:14 +1000 Subject: [PATCH 2/2] Increased the build number --- Session.xcodeproj/project.pbxproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index 6c20b39d0..c7ca1ac9c 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -6818,7 +6818,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 363; + CURRENT_PROJECT_VERSION = 365; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)", @@ -6890,7 +6890,7 @@ CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements; CODE_SIGN_IDENTITY = "iPhone Developer"; "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; - CURRENT_PROJECT_VERSION = 363; + CURRENT_PROJECT_VERSION = 365; DEVELOPMENT_TEAM = SUQ8J2PCT7; FRAMEWORK_SEARCH_PATHS = ( "$(inherited)",