diff --git a/Session.xcodeproj/project.pbxproj b/Session.xcodeproj/project.pbxproj index 015f4439c..45e8d16d5 100644 --- a/Session.xcodeproj/project.pbxproj +++ b/Session.xcodeproj/project.pbxproj @@ -651,6 +651,8 @@ FD3C907127E445E500CD579F /* MessageReceiverDecryptionSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD3C907027E445E500CD579F /* MessageReceiverDecryptionSpec.swift */; }; FD3E0C84283B5835002A425C /* SessionThreadViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD3E0C83283B5835002A425C /* SessionThreadViewModel.swift */; }; FD42F9A8285064B800A0C77D /* PushNotificationAPI.swift in Sources */ = {isa = PBXBuildFile; fileRef = C33FDBDE255A581900E217F9 /* PushNotificationAPI.swift */; }; + FD432432299C6933008A0213 /* _011_AddPendingReadReceipts.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD432431299C6933008A0213 /* _011_AddPendingReadReceipts.swift */; }; + FD432434299C6985008A0213 /* PendingReadReceipt.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD432433299C6985008A0213 /* PendingReadReceipt.swift */; }; FD4B200E283492210034334B /* InsetLockableTableView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD4B200D283492210034334B /* InsetLockableTableView.swift */; }; FD52090028AF6153006098F6 /* OWSBackgroundTask.m in Sources */ = {isa = PBXBuildFile; fileRef = C33FDC1B255A581F00E217F9 /* OWSBackgroundTask.m */; }; FD52090128AF61BA006098F6 /* OWSBackgroundTask.h in Headers */ = {isa = PBXBuildFile; fileRef = C33FDB38255A580B00E217F9 /* OWSBackgroundTask.h */; settings = {ATTRIBUTES = (Public, ); }; }; @@ -1735,6 +1737,8 @@ FD3C907027E445E500CD579F /* MessageReceiverDecryptionSpec.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageReceiverDecryptionSpec.swift; sourceTree = ""; }; FD3C907427E83AC200CD579F /* OpenGroupServerIdLookup.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OpenGroupServerIdLookup.swift; sourceTree = ""; }; FD3E0C83283B5835002A425C /* SessionThreadViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionThreadViewModel.swift; sourceTree = ""; }; + FD432431299C6933008A0213 /* _011_AddPendingReadReceipts.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _011_AddPendingReadReceipts.swift; sourceTree = ""; }; + FD432433299C6985008A0213 /* PendingReadReceipt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PendingReadReceipt.swift; sourceTree = ""; }; FD4B200D283492210034334B /* InsetLockableTableView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = InsetLockableTableView.swift; sourceTree = ""; }; FD52090228B4680F006098F6 /* RadioButton.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RadioButton.swift; sourceTree = ""; }; FD52090428B4915F006098F6 /* PrivacySettingsViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PrivacySettingsViewModel.swift; sourceTree = ""; }; @@ -3524,6 +3528,7 @@ FDE77F6A280FEB28002CFC5D /* ControlMessageProcessRecord.swift */, FD5C7308285007920029977D /* BlindedIdLookup.swift */, FD09B7E6288670FD00ED0B66 /* Reaction.swift */, + FD432433299C6985008A0213 /* PendingReadReceipt.swift */, ); path = Models; sourceTree = ""; @@ -3541,6 +3546,7 @@ FD09B7E4288670BB00ED0B66 /* _008_EmojiReacts.swift */, 7BAA7B6528D2DE4700AE1489 /* _009_OpenGroupPermission.swift */, FD7115F128C6CB3900B47552 /* _010_AddThreadIdToFTS.swift */, + FD432431299C6933008A0213 /* _011_AddPendingReadReceipts.swift */, ); path = Migrations; sourceTree = ""; @@ -5490,6 +5496,7 @@ FD716E6428502DDD00C96BF4 /* CallManagerProtocol.swift in Sources */, FDC438C727BB6DF000C60D73 /* DirectMessage.swift in Sources */, FDC4384F27B4804F00C60D73 /* Header.swift in Sources */, + FD432434299C6985008A0213 /* PendingReadReceipt.swift in Sources */, FDC4381727B32EC700C60D73 /* Personalization.swift in Sources */, FD245C51285065CC00B966DD /* MessageReceiver.swift in Sources */, FD245C652850665400B966DD /* ClosedGroupControlMessage.swift in Sources */, @@ -5551,6 +5558,7 @@ FDC438C127BB4E6800C60D73 /* SMKDependencies.swift in Sources */, FDC4383827B3863200C60D73 /* VersionResponse.swift in Sources */, B806ECA126C4A7E4008BDA44 /* WebRTCSession+UI.swift in Sources */, + FD432432299C6933008A0213 /* _011_AddPendingReadReceipts.swift in Sources */, 7BCD116C27016062006330F1 /* WebRTCSession+DataChannel.swift in Sources */, FD5C72F9284F0E880029977D /* MessageReceiver+TypingIndicators.swift in Sources */, FD5C7303284F0FA50029977D /* MessageReceiver+Calls.swift in Sources */, diff --git a/Session/Conversations/ConversationViewModel.swift b/Session/Conversations/ConversationViewModel.swift index 4f3a113cb..4dfd388da 100644 --- a/Session/Conversations/ConversationViewModel.swift +++ b/Session/Conversations/ConversationViewModel.swift @@ -200,7 +200,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate { ), PagedData.ObservedChanges( table: RecipientState.self, - columns: [.state, .mostRecentFailureText], + columns: [.state, .readTimestampMs, .mostRecentFailureText], joinToPagedType: { let interaction: TypedTableAlias = TypedTableAlias() let recipientState: TypedTableAlias = TypedTableAlias() diff --git a/Session/Settings/QRCodeVC.swift b/Session/Settings/QRCodeVC.swift index 5884a535e..e9e91b78a 100644 --- a/Session/Settings/QRCodeVC.swift +++ b/Session/Settings/QRCodeVC.swift @@ -4,6 +4,7 @@ import UIKit import AVFoundation import Curve25519Kit import SessionUIKit +import SessionMessagingKit import SessionUtilitiesKit final class QRCodeVC : BaseVC, UIPageViewControllerDataSource, UIPageViewControllerDelegate, QRScannerDelegate { diff --git a/SessionMessagingKit/Configuration.swift b/SessionMessagingKit/Configuration.swift index 868961582..27b375f99 100644 --- a/SessionMessagingKit/Configuration.swift +++ b/SessionMessagingKit/Configuration.swift @@ -24,7 +24,8 @@ public enum SNMessagingKit { // Just to make the external API nice [ _008_EmojiReacts.self, _009_OpenGroupPermission.self, - _010_AddThreadIdToFTS.self + _010_AddThreadIdToFTS.self, + _011_AddPendingReadReceipts.self ] ] ) diff --git a/SessionMessagingKit/Database/Migrations/_011_AddPendingReadReceipts.swift b/SessionMessagingKit/Database/Migrations/_011_AddPendingReadReceipts.swift new file mode 100644 index 000000000..69e231b49 --- /dev/null +++ b/SessionMessagingKit/Database/Migrations/_011_AddPendingReadReceipts.swift @@ -0,0 +1,41 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB +import SessionUtilitiesKit + +/// This migration adds a table to track pending read receipts (it's possible to receive a read receipt message before getting the original +/// message due to how one-to-one conversations work, by storing pending read receipts we should be able to prevent this case) +enum _011_AddPendingReadReceipts: Migration { + static let target: TargetMigrations.Identifier = .messagingKit + static let identifier: String = "AddPendingReadReceipts" + static let needsConfigSync: Bool = false + static let minExpectedRunDuration: TimeInterval = 0.1 + + static func migrate(_ db: Database) throws { + // Can't actually alter a virtual table in SQLite so we need to drop and recreate it, + // luckily this is actually pretty quick + if try db.tableExists(Interaction.fullTextSearchTableName) { + try db.drop(table: Interaction.fullTextSearchTableName) + try db.dropFTS5SynchronizationTriggers(forTable: Interaction.fullTextSearchTableName) + } + + try db.create(table: PendingReadReceipt.self) { t in + t.column(.threadId, .text) + .notNull() + .indexed() // Quicker querying + .references(SessionThread.self, onDelete: .cascade) // Delete if Thread deleted + t.column(.interactionTimestampMs, .integer) + .notNull() + .indexed() // Quicker querying + t.column(.readTimestampMs, .integer) + .notNull() + t.column(.serverExpirationTimestamp, .double) + .notNull() + + t.primaryKey([.threadId, .interactionTimestampMs]) + } + + Storage.update(progress: 1, for: self, in: target) // In case this is the last migration + } +} diff --git a/SessionMessagingKit/Database/Models/Interaction.swift b/SessionMessagingKit/Database/Models/Interaction.swift index 0193c47c0..fe244b8f0 100644 --- a/SessionMessagingKit/Database/Models/Interaction.swift +++ b/SessionMessagingKit/Database/Models/Interaction.swift @@ -450,26 +450,33 @@ public extension Interaction { trySendReadReceipt: Bool ) throws { guard let interactionId: Int64 = interactionId else { return } + + struct InteractionReadInfo: Decodable, FetchableRecord { + let id: Int64 + let variant: Interaction.Variant + let timestampMs: Int64 + let wasRead: Bool + } // Once all of the below is done schedule the jobs - func scheduleJobs(interactionIds: [Int64]) { + func scheduleJobs(interactionInfo: [InteractionReadInfo]) { // Add the 'DisappearingMessagesJob' if needed - this will update any expiring // messages `expiresStartedAtMs` values JobRunner.upsert( db, job: DisappearingMessagesJob.updateNextRunIfNeeded( db, - interactionIds: interactionIds, + interactionIds: interactionInfo.map { $0.id }, startedAtMs: TimeInterval(SnodeAPI.currentOffsetTimestampMs()) ) ) // Clear out any notifications for the interactions we mark as read Environment.shared?.notificationsManager.wrappedValue?.cancelNotifications( - identifiers: interactionIds - .map { interactionId in + identifiers: interactionInfo + .map { interactionInfo in Interaction.notificationIdentifier( - for: interactionId, + for: interactionInfo.id, threadId: threadId, shouldGroupMessagesForThread: false ) @@ -482,43 +489,54 @@ public extension Interaction { ) // If we want to send read receipts and it's a contact thread then try to add the - // 'SendReadReceiptsJob' + // 'SendReadReceiptsJob' for and unread messages that weren't outgoing if trySendReadReceipt && threadVariant == .contact { JobRunner.upsert( db, job: SendReadReceiptsJob.createOrUpdateIfNeeded( db, threadId: threadId, - interactionIds: interactionIds + interactionIds: interactionInfo + .filter { !$0.wasRead && $0.variant != .standardOutgoing } + .map { $0.id } ) ) } } - // If we aren't including older interactions then update and save the current one - struct InteractionReadInfo: Decodable, FetchableRecord { - let timestampMs: Int64 - let wasRead: Bool - } - // Since there is no guarantee on the order messages are inserted into the database // fetch the timestamp for the interaction and set everything before that as read let maybeInteractionInfo: InteractionReadInfo? = try Interaction - .select(.timestampMs, .wasRead) + .select(.id, .variant, .timestampMs, .wasRead) .filter(id: interactionId) .asRequest(of: InteractionReadInfo.self) .fetchOne(db) + // If we aren't including older interactions then update and save the current one guard includingOlder, let interactionInfo: InteractionReadInfo = maybeInteractionInfo else { // Only mark as read and trigger the subsequent jobs if the interaction is // actually not read (no point updating and triggering db changes otherwise) - guard maybeInteractionInfo?.wasRead == false else { return } + guard + maybeInteractionInfo?.wasRead == false, + let variant: Variant = try Interaction + .filter(id: interactionId) + .select(.variant) + .asRequest(of: Variant.self) + .fetchOne(db) + else { return } _ = try Interaction .filter(id: interactionId) .updateAll(db, Columns.wasRead.set(to: true)) - scheduleJobs(interactionIds: [interactionId]) + scheduleJobs(interactionInfo: [ + InteractionReadInfo( + id: interactionId, + variant: variant, + timestampMs: 0, + wasRead: false + ) + ]) return } @@ -526,16 +544,16 @@ public extension Interaction { .filter(Interaction.Columns.threadId == threadId) .filter(Interaction.Columns.timestampMs <= interactionInfo.timestampMs) .filter(Interaction.Columns.wasRead == false) - let interactionIdsToMarkAsRead: [Int64] = try interactionQuery - .select(.id) - .asRequest(of: Int64.self) + let interactionInfoToMarkAsRead: [InteractionReadInfo] = try interactionQuery + .select(.id, .variant, .timestampMs, .wasRead) + .asRequest(of: InteractionReadInfo.self) .fetchAll(db) // If there are no other interactions to mark as read then just schedule the jobs // for this interaction (need to ensure the disapeparing messages run for sync'ed // outgoing messages which will always have 'wasRead' as false) - guard !interactionIdsToMarkAsRead.isEmpty else { - scheduleJobs(interactionIds: [interactionId]) + guard !interactionInfoToMarkAsRead.isEmpty else { + scheduleJobs(interactionInfo: [interactionInfo]) return } @@ -543,27 +561,71 @@ public extension Interaction { try interactionQuery.updateAll(db, Columns.wasRead.set(to: true)) // Retrieve the interaction ids we want to update - scheduleJobs(interactionIds: interactionIdsToMarkAsRead) + scheduleJobs(interactionInfo: interactionInfoToMarkAsRead) } /// This method flags sent messages as read for the specified recipients /// /// **Note:** This method won't update the 'wasRead' flag (it will be updated via the above method) - static func markAsRead(_ db: Database, recipientId: String, timestampMsValues: [Double], readTimestampMs: Double) throws { - guard db[.areReadReceiptsEnabled] == true else { return } + @discardableResult static func markAsRead( + _ db: Database, + recipientId: String, + timestampMsValues: [Int64], + readTimestampMs: Int64 + ) throws -> Set { + guard db[.areReadReceiptsEnabled] == true else { return [] } - try RecipientState + // Update the read state + let rowIds: [Int64] = try RecipientState + .select(Column.rowID) .filter(RecipientState.Columns.recipientId == recipientId) .joining( required: RecipientState.interaction - .filter(Columns.variant == Variant.standardOutgoing) .filter(timestampMsValues.contains(Columns.timestampMs)) + .filter(Columns.variant == Variant.standardOutgoing) ) + .asRequest(of: Int64.self) + .fetchAll(db) + + // If there were no 'rowIds' then no need to run the below queries, all of the timestamps + // and for pending read receipts + guard !rowIds.isEmpty else { return timestampMsValues.asSet() } + + // Update the 'readTimestampMs' if it doesn't match (need to do this to prevent + // the UI update from being triggered for a redundant update) + try RecipientState + .filter(rowIds.contains(Column.rowID)) + .filter(RecipientState.Columns.readTimestampMs == nil) + .updateAll( + db, + RecipientState.Columns.readTimestampMs.set(to: readTimestampMs) + ) + + // If the message still appeared to be sending then mark it as sent + try RecipientState + .filter(rowIds.contains(Column.rowID)) + .filter(RecipientState.Columns.state == RecipientState.State.sending) .updateAll( db, - RecipientState.Columns.readTimestampMs.set(to: readTimestampMs), RecipientState.Columns.state.set(to: RecipientState.State.sent) ) + + // Retrieve the set of timestamps which were updated + let timestampsUpdated: Set = try Interaction + .select(Columns.timestampMs) + .filter(timestampMsValues.contains(Columns.timestampMs)) + .filter(Columns.variant == Variant.standardOutgoing) + .joining( + required: Interaction.recipientStates + .filter(rowIds.contains(Column.rowID)) + ) + .asRequest(of: Int64.self) + .fetchSet(db) + + // Return the timestamps which weren't updated + return timestampMsValues + .asSet() + .subtracting(timestampsUpdated) } } diff --git a/SessionMessagingKit/Database/Models/PendingReadReceipt.swift b/SessionMessagingKit/Database/Models/PendingReadReceipt.swift new file mode 100644 index 000000000..f01531a70 --- /dev/null +++ b/SessionMessagingKit/Database/Models/PendingReadReceipt.swift @@ -0,0 +1,44 @@ +// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved. + +import Foundation +import GRDB +import SessionUtilitiesKit + +public struct PendingReadReceipt: Codable, Equatable, Hashable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible { + public static var databaseTableName: String { "pendingReadReceipt" } + public static let threadForeignKey = ForeignKey([Columns.threadId], to: [SessionThread.Columns.id]) + + public typealias Columns = CodingKeys + public enum CodingKeys: String, CodingKey, ColumnExpression { + case threadId + case interactionTimestampMs + case readTimestampMs + case serverExpirationTimestamp + } + + /// The id for the thread this ReadReceipt belongs to + public let threadId: String + + /// The timestamp in milliseconds since epoch for the interaction this read receipt relates to + public let interactionTimestampMs: Int64 + + /// The timestamp in milliseconds since epoch that the interaction this read receipt relates to was read + public let readTimestampMs: Int64 + + /// The timestamp for when this message will expire on the server (will be used for garbage collection) + public let serverExpirationTimestamp: TimeInterval + + // MARK: - Initialization + + public init( + threadId: String, + interactionTimestampMs: Int64, + readTimestampMs: Int64, + serverExpirationTimestamp: TimeInterval + ) { + self.threadId = threadId + self.interactionTimestampMs = interactionTimestampMs + self.readTimestampMs = readTimestampMs + self.serverExpirationTimestamp = serverExpirationTimestamp + } +} diff --git a/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift b/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift index e8d50e5c2..512e61bfd 100644 --- a/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift +++ b/SessionMessagingKit/Jobs/Types/GarbageCollectionJob.swift @@ -41,7 +41,7 @@ public enum GarbageCollectionJob: JobExecutor { /// are shown) let lastGarbageCollection: Date = UserDefaults.standard[.lastGarbageCollection] .defaulting(to: Date.distantPast) - let finalTypesToCollection: Set = { + let finalTypesToCollect: Set = { guard job.behaviour != .recurringOnActive || Date().timeIntervalSince(lastGarbageCollection) > (23 * 60 * 60) @@ -60,20 +60,20 @@ public enum GarbageCollectionJob: JobExecutor { Storage.shared.writeAsync( updates: { db in /// Remove any typing indicators - if finalTypesToCollection.contains(.threadTypingIndicators) { + if finalTypesToCollect.contains(.threadTypingIndicators) { _ = try ThreadTypingIndicator .deleteAll(db) } /// Remove any expired controlMessageProcessRecords - if finalTypesToCollection.contains(.expiredControlMessageProcessRecords) { + if finalTypesToCollect.contains(.expiredControlMessageProcessRecords) { _ = try ControlMessageProcessRecord .filter(ControlMessageProcessRecord.Columns.serverExpirationTimestamp <= timestampNow) .deleteAll(db) } /// Remove any old open group messages - open group messages which are older than six months - if finalTypesToCollection.contains(.oldOpenGroupMessages) && db[.trimOpenGroupMessagesOlderThanSixMonths] { + if finalTypesToCollect.contains(.oldOpenGroupMessages) && db[.trimOpenGroupMessagesOlderThanSixMonths] { let interaction: TypedTableAlias = TypedTableAlias() let thread: TypedTableAlias = TypedTableAlias() let threadIdLiteral: SQL = SQL(stringLiteral: Interaction.Columns.threadId.name) @@ -104,7 +104,7 @@ public enum GarbageCollectionJob: JobExecutor { } /// Orphaned jobs - jobs which have had their threads or interactions removed - if finalTypesToCollection.contains(.orphanedJobs) { + if finalTypesToCollect.contains(.orphanedJobs) { let job: TypedTableAlias = TypedTableAlias() let thread: TypedTableAlias = TypedTableAlias() let interaction: TypedTableAlias = TypedTableAlias() @@ -130,7 +130,7 @@ public enum GarbageCollectionJob: JobExecutor { } /// Orphaned link previews - link previews which have no interactions with matching url & rounded timestamps - if finalTypesToCollection.contains(.orphanedLinkPreviews) { + if finalTypesToCollect.contains(.orphanedLinkPreviews) { let linkPreview: TypedTableAlias = TypedTableAlias() let interaction: TypedTableAlias = TypedTableAlias() @@ -150,7 +150,7 @@ public enum GarbageCollectionJob: JobExecutor { /// Orphaned open groups - open groups which are no longer associated to a thread (except for the session-run ones for which /// we want cached image data even if the user isn't in the group) - if finalTypesToCollection.contains(.orphanedOpenGroups) { + if finalTypesToCollect.contains(.orphanedOpenGroups) { let openGroup: TypedTableAlias = TypedTableAlias() let thread: TypedTableAlias = TypedTableAlias() @@ -169,7 +169,7 @@ public enum GarbageCollectionJob: JobExecutor { } /// Orphaned open group capabilities - capabilities which have no existing open groups with the same server - if finalTypesToCollection.contains(.orphanedOpenGroupCapabilities) { + if finalTypesToCollect.contains(.orphanedOpenGroupCapabilities) { let capability: TypedTableAlias = TypedTableAlias() let openGroup: TypedTableAlias = TypedTableAlias() @@ -185,7 +185,7 @@ public enum GarbageCollectionJob: JobExecutor { } /// Orphaned blinded id lookups - lookups which have no existing threads or approval/block settings for either blinded/un-blinded id - if finalTypesToCollection.contains(.orphanedBlindedIdLookups) { + if finalTypesToCollect.contains(.orphanedBlindedIdLookups) { let blindedIdLookup: TypedTableAlias = TypedTableAlias() let thread: TypedTableAlias = TypedTableAlias() let contact: TypedTableAlias = TypedTableAlias() @@ -213,7 +213,7 @@ public enum GarbageCollectionJob: JobExecutor { /// Approved blinded contact records - once a blinded contact has been approved there is no need to keep the blinded /// contact record around anymore - if finalTypesToCollection.contains(.approvedBlindedContactRecords) { + if finalTypesToCollect.contains(.approvedBlindedContactRecords) { let contact: TypedTableAlias = TypedTableAlias() let blindedIdLookup: TypedTableAlias = TypedTableAlias() @@ -232,7 +232,7 @@ public enum GarbageCollectionJob: JobExecutor { } /// Orphaned attachments - attachments which have no related interactions, quotes or link previews - if finalTypesToCollection.contains(.orphanedAttachments) { + if finalTypesToCollect.contains(.orphanedAttachments) { let attachment: TypedTableAlias = TypedTableAlias() let quote: TypedTableAlias = TypedTableAlias() let linkPreview: TypedTableAlias = TypedTableAlias() @@ -255,7 +255,7 @@ public enum GarbageCollectionJob: JobExecutor { """) } - if finalTypesToCollection.contains(.orphanedProfiles) { + if finalTypesToCollect.contains(.orphanedProfiles) { let profile: TypedTableAlias = TypedTableAlias() let thread: TypedTableAlias = TypedTableAlias() let interaction: TypedTableAlias = TypedTableAlias() @@ -289,6 +289,12 @@ public enum GarbageCollectionJob: JobExecutor { ) """) } + + if finalTypesToCollect.contains(.expiredPendingReadReceipts) { + _ = try PendingReadReceipt + .filter(PendingReadReceipt.Columns.serverExpirationTimestamp <= timestampNow) + .deleteAll(db) + } }, completion: { _, _ in // Dispatch async so we can swap from the write queue to a read one (we are done writing) @@ -304,7 +310,7 @@ public enum GarbageCollectionJob: JobExecutor { var profileAvatarFilenames: Set = [] /// Orphaned attachment files - attachment files which don't have an associated record in the database - if finalTypesToCollection.contains(.orphanedAttachmentFiles) { + if finalTypesToCollect.contains(.orphanedAttachmentFiles) { /// **Note:** Thumbnails are stored in the `NSCachesDirectory` directory which should be automatically manage /// it's own garbage collection so we can just ignore it according to the various comments in the following stack overflow /// post, the directory will be cleared during app updates as well as if the system is running low on memory (if the app isn't running) @@ -317,7 +323,7 @@ public enum GarbageCollectionJob: JobExecutor { } /// Orphaned profile avatar files - profile avatar files which don't have an associated record in the database - if finalTypesToCollection.contains(.orphanedProfileAvatars) { + if finalTypesToCollect.contains(.orphanedProfileAvatars) { profileAvatarFilenames = try Profile .select(.profilePictureFileName) .filter(Profile.Columns.profilePictureFileName != nil) @@ -340,7 +346,7 @@ public enum GarbageCollectionJob: JobExecutor { var deletionErrors: [Error] = [] // Orphaned attachment files (actual deletion) - if finalTypesToCollection.contains(.orphanedAttachmentFiles) { + if finalTypesToCollect.contains(.orphanedAttachmentFiles) { // Note: Looks like in order to recursively look through files we need to use the // enumerator method let fileEnumerator = FileManager.default.enumerator( @@ -384,7 +390,7 @@ public enum GarbageCollectionJob: JobExecutor { } // Orphaned profile avatar files (actual deletion) - if finalTypesToCollection.contains(.orphanedProfileAvatars) { + if finalTypesToCollect.contains(.orphanedProfileAvatars) { let allAvatarProfileFilenames: Set = (try? FileManager.default .contentsOfDirectory(atPath: ProfileManager.sharedDataProfileAvatarsDirPath)) .defaulting(to: []) @@ -442,6 +448,7 @@ extension GarbageCollectionJob { case orphanedAttachments case orphanedAttachmentFiles case orphanedProfileAvatars + case expiredPendingReadReceipts } public struct Details: Codable { diff --git a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift index a9621fa83..907f6af8d 100644 --- a/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageReceiveJob.swift @@ -36,6 +36,7 @@ public enum MessageReceiveJob: JobExecutor { try MessageReceiver.handle( db, message: messageInfo.message, + serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData), openGroupId: nil ) @@ -104,30 +105,36 @@ extension MessageReceiveJob { private enum CodingKeys: String, CodingKey { case message case variant + case serverExpirationTimestamp case serializedProtoData } public let message: Message public let variant: Message.Variant + public let serverExpirationTimestamp: TimeInterval? public let serializedProtoData: Data public init( message: Message, variant: Message.Variant, + serverExpirationTimestamp: TimeInterval?, proto: SNProtoContent ) throws { self.message = message self.variant = variant + self.serverExpirationTimestamp = serverExpirationTimestamp self.serializedProtoData = try proto.serializedData() } private init( message: Message, variant: Message.Variant, + serverExpirationTimestamp: TimeInterval?, serializedProtoData: Data ) { self.message = message self.variant = variant + self.serverExpirationTimestamp = serverExpirationTimestamp self.serializedProtoData = serializedProtoData } @@ -144,6 +151,7 @@ extension MessageReceiveJob { self = MessageInfo( message: try variant.decode(from: container, forKey: .message), variant: variant, + serverExpirationTimestamp: try? container.decode(TimeInterval.self, forKey: .serverExpirationTimestamp), serializedProtoData: try container.decode(Data.self, forKey: .serializedProtoData) ) } @@ -158,6 +166,7 @@ extension MessageReceiveJob { try container.encode(message, forKey: .message) try container.encode(variant, forKey: .variant) + try container.encodeIfPresent(serverExpirationTimestamp, forKey: .serverExpirationTimestamp) try container.encode(serializedProtoData, forKey: .serializedProtoData) } } diff --git a/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift b/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift index f4b1c23d7..bdb684869 100644 --- a/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift +++ b/SessionMessagingKit/Jobs/Types/SendReadReceiptsJob.swift @@ -105,6 +105,7 @@ public extension SendReadReceiptsJob { /// ensure that is done correctly beforehand @discardableResult static func createOrUpdateIfNeeded(_ db: Database, threadId: String, interactionIds: [Int64]) -> Job? { guard db[.areReadReceiptsEnabled] == true else { return nil } + guard !interactionIds.isEmpty else { return nil } // Retrieve the timestampMs values for the specified interactions let timestampMsValues: [Int64] = (try? Interaction diff --git a/SessionMessagingKit/Messages/Message.swift b/SessionMessagingKit/Messages/Message.swift index befca3712..c6e4555db 100644 --- a/SessionMessagingKit/Messages/Message.swift +++ b/SessionMessagingKit/Messages/Message.swift @@ -547,6 +547,7 @@ public extension Message { try MessageReceiveJob.Details.MessageInfo( message: message, variant: variant, + serverExpirationTimestamp: serverExpirationTimestamp, proto: proto ) ) diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index d5baf0d36..928ffaaea 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -576,6 +576,7 @@ public final class OpenGroupManager: NSObject { try MessageReceiver.handle( db, message: messageInfo.message, + serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData), openGroupId: openGroup.id, dependencies: dependencies @@ -739,6 +740,7 @@ public final class OpenGroupManager: NSObject { try MessageReceiver.handle( db, message: messageInfo.message, + serverExpirationTimestamp: messageInfo.serverExpirationTimestamp, associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData), openGroupId: nil, // Intentionally nil as they are technically not open group messages dependencies: dependencies diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ReadReceipts.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ReadReceipts.swift index 913610e83..3c3ab13c0 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ReadReceipts.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ReadReceipts.swift @@ -4,16 +4,32 @@ import Foundation import GRDB extension MessageReceiver { - internal static func handleReadReceipt(_ db: Database, message: ReadReceipt) throws { + internal static func handleReadReceipt( + _ db: Database, + message: ReadReceipt, + serverExpirationTimestamp: TimeInterval? + ) throws { guard let sender: String = message.sender else { return } - guard let timestampMsValues: [Double] = message.timestamps?.map({ Double($0) }) else { return } - guard let readTimestampMs: Double = message.receivedTimestamp.map({ Double($0) }) else { return } + guard let timestampMsValues: [Int64] = message.timestamps?.map({ Int64($0) }) else { return } + guard let readTimestampMs: Int64 = message.receivedTimestamp.map({ Int64($0) }) else { return } - try Interaction.markAsRead( + let pendingTimestampMs: Set = try Interaction.markAsRead( db, recipientId: sender, timestampMsValues: timestampMsValues, readTimestampMs: readTimestampMs ) + + guard !pendingTimestampMs.isEmpty else { return } + + // We have some pending read receipts so store them in the database + try pendingTimestampMs.forEach { timestampMs in + try PendingReadReceipt( + threadId: sender, + interactionTimestampMs: timestampMs, + readTimestampMs: readTimestampMs, + serverExpirationTimestamp: (serverExpirationTimestamp ?? 0) + ).save(db) + } } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift index cb3e55df7..f0f96e1bb 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift @@ -161,6 +161,7 @@ extension MessageReceiver { db, thread: thread, interactionId: existingInteractionId, + messageSentTimestamp: messageSentTimestamp, variant: variant, syncTarget: message.syncTarget ) @@ -178,6 +179,7 @@ extension MessageReceiver { db, thread: thread, interactionId: interactionId, + messageSentTimestamp: messageSentTimestamp, variant: variant, syncTarget: message.syncTarget ) @@ -363,6 +365,7 @@ extension MessageReceiver { _ db: Database, thread: SessionThread, interactionId: Int64, + messageSentTimestamp: TimeInterval, variant: Interaction.Variant, syncTarget: String? ) throws { @@ -371,6 +374,7 @@ extension MessageReceiver { // Immediately update any existing outgoing message 'RecipientState' records to be 'sent' _ = try? RecipientState .filter(RecipientState.Columns.interactionId == interactionId) + .filter(RecipientState.Columns.state != RecipientState.State.sent) .updateAll(db, RecipientState.Columns.state.set(to: RecipientState.State.sent)) // Create any addiitonal 'RecipientState' records as needed @@ -415,5 +419,22 @@ extension MessageReceiver { includingOlder: true, trySendReadReceipt: true ) + + // Process any PendingReadReceipt values + let maybePendingReadReceipt: PendingReadReceipt? = try PendingReadReceipt + .filter(PendingReadReceipt.Columns.threadId == thread.id) + .filter(PendingReadReceipt.Columns.interactionTimestampMs == Int64(messageSentTimestamp * 1000)) + .fetchOne(db) + + if let pendingReadReceipt: PendingReadReceipt = maybePendingReadReceipt { + try Interaction.markAsRead( + db, + recipientId: thread.id, + timestampMsValues: [pendingReadReceipt.interactionTimestampMs], + readTimestampMs: pendingReadReceipt.readTimestampMs + ) + + _ = try pendingReadReceipt.delete(db) + } } } diff --git a/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift b/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift index 6c1a3dc40..2f00d7d4d 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageReceiver.swift @@ -179,13 +179,18 @@ public enum MessageReceiver { public static func handle( _ db: Database, message: Message, + serverExpirationTimestamp: TimeInterval?, associatedWithProto proto: SNProtoContent, openGroupId: String?, dependencies: SMKDependencies = SMKDependencies() ) throws { switch message { case let message as ReadReceipt: - try MessageReceiver.handleReadReceipt(db, message: message) + try MessageReceiver.handleReadReceipt( + db, + message: message, + serverExpirationTimestamp: serverExpirationTimestamp + ) case let message as TypingIndicator: try MessageReceiver.handleTypingIndicator(db, message: message) diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index 188818755..85746de0b 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -641,6 +641,7 @@ public final class MessageSender { // Mark the message as sent try interaction.recipientStates + .filter(RecipientState.Columns.state != RecipientState.State.sent) .updateAll(db, RecipientState.Columns.state.set(to: RecipientState.State.sent)) // Start the disappearing messages timer if needed @@ -773,19 +774,21 @@ public final class MessageSender { if let message = message as? VisibleMessage { message.syncTarget = publicKey } if let message = message as? ExpirationTimerUpdate { message.syncTarget = publicKey } - JobRunner.add( - db, - job: Job( - variant: .messageSend, - threadId: threadId, - interactionId: interactionId, - details: MessageSendJob.Details( - destination: .contact(publicKey: currentUserPublicKey), - message: message, - isSyncMessage: true + Storage.shared.write { db in + JobRunner.add( + db, + job: Job( + variant: .messageSend, + threadId: threadId, + interactionId: interactionId, + details: MessageSendJob.Details( + destination: .contact(publicKey: currentUserPublicKey), + message: message, + isSyncMessage: true + ) ) ) - ) + } } } }