Merge pull request #927 from mpretty-cyro/fix/snode-info-deduping

Fixed an issue where the messages might not get reprocessed when they should
This commit is contained in:
Morgan Pretty 2023-10-12 16:14:30 +11:00 committed by GitHub
commit aec2aed81f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 107 additions and 35 deletions

View File

@ -648,6 +648,7 @@
FD6A7A692818BE7300035AC1 /* RetrieveDefaultOpenGroupRoomsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6A7A682818BE7300035AC1 /* RetrieveDefaultOpenGroupRoomsJob.swift */; };
FD6A7A6B2818C17C00035AC1 /* UpdateProfilePictureJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6A7A6A2818C17C00035AC1 /* UpdateProfilePictureJob.swift */; };
FD6A7A6D2818C61500035AC1 /* _002_SetupStandardJobs.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6A7A6C2818C61500035AC1 /* _002_SetupStandardJobs.swift */; };
FD6DF00B2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */; };
FD6E4C8A2A1AEE4700C7C243 /* LegacyUnsubscribeRequest.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6E4C892A1AEE4700C7C243 /* LegacyUnsubscribeRequest.swift */; };
FD705A92278D051200F16121 /* ReusableView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD705A91278D051200F16121 /* ReusableView.swift */; };
FD7115EB28C5D78E00B47552 /* ThreadSettingsViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD7115EA28C5D78E00B47552 /* ThreadSettingsViewModel.swift */; };
@ -1759,6 +1760,7 @@
FD6A7A682818BE7300035AC1 /* RetrieveDefaultOpenGroupRoomsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RetrieveDefaultOpenGroupRoomsJob.swift; sourceTree = "<group>"; };
FD6A7A6A2818C17C00035AC1 /* UpdateProfilePictureJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UpdateProfilePictureJob.swift; sourceTree = "<group>"; };
FD6A7A6C2818C61500035AC1 /* _002_SetupStandardJobs.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _002_SetupStandardJobs.swift; sourceTree = "<group>"; };
FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _005_AddSnodeReveivedMessageInfoPrimaryKey.swift; sourceTree = "<group>"; };
FD6E4C892A1AEE4700C7C243 /* LegacyUnsubscribeRequest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LegacyUnsubscribeRequest.swift; sourceTree = "<group>"; };
FD705A91278D051200F16121 /* ReusableView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReusableView.swift; sourceTree = "<group>"; };
FD7115EA28C5D78E00B47552 /* ThreadSettingsViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadSettingsViewModel.swift; sourceTree = "<group>"; };
@ -3670,6 +3672,7 @@
FD6A7A6C2818C61500035AC1 /* _002_SetupStandardJobs.swift */,
FD17D7A327F40F8100122BE0 /* _003_YDBToGRDBMigration.swift */,
FD39353528F7C3390084DADA /* _004_FlagMessageHashAsDeletedOrInvalid.swift */,
FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */,
);
path = Migrations;
sourceTree = "<group>";
@ -5765,6 +5768,7 @@
FD39353628F7C3390084DADA /* _004_FlagMessageHashAsDeletedOrInvalid.swift in Sources */,
FDF8489429405C1B007DCAE5 /* SnodeAPI.swift in Sources */,
FDF848C829405C5B007DCAE5 /* ONSResolveRequest.swift in Sources */,
FD6DF00B2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift in Sources */,
C3C2A5C2255385EE00C340D1 /* Configuration.swift in Sources */,
FDF848C929405C5B007DCAE5 /* SnodeRequest.swift in Sources */,
FDF848CF29405C5B007DCAE5 /* SendMessageRequest.swift in Sources */,

View File

@ -10,23 +10,23 @@ public enum SNMessagingKit: MigratableTarget { // Just to make the external API
[
_001_InitialSetupMigration.self,
_002_SetupStandardJobs.self
],
], // Initial DB Creation
[
_003_YDBToGRDBMigration.self
],
], // YDB to GRDB Migration
[
_004_RemoveLegacyYDB.self
],
], // Legacy DB removal
[
_005_FixDeletedMessageReadState.self,
_006_FixHiddenModAdminSupport.self,
_007_HomeQueryOptimisationIndexes.self
],
], // Add job priorities
[
_008_EmojiReacts.self,
_009_OpenGroupPermission.self,
_010_AddThreadIdToFTS.self
], // Add job priorities
], // Fix thread FTS
[
_011_AddPendingReadReceipts.self,
_012_AddFTSIfNeeded.self,

View File

@ -263,7 +263,7 @@ public class Poller {
let lastHashes: [String] = namespacedResults
.compactMap { $0.value.data?.lastHash }
let otherKnownHashes: [String] = namespacedResults
.filter { $0.key.shouldDedupeMessages }
.filter { $0.key.shouldFetchSinceLastHash }
.compactMap { $0.value.data?.messages.map { $0.info.hash } }
.reduce([], +)
var messageCount: Int = 0

View File

@ -12,14 +12,18 @@ public enum SNSnodeKit: MigratableTarget { // Just to make the external API nice
[
_001_InitialSetupMigration.self,
_002_SetupStandardJobs.self
],
], // Initial DB Creation
[
_003_YDBToGRDBMigration.self
],
], // YDB to GRDB Migration
[
_004_FlagMessageHashAsDeletedOrInvalid.self
],
[] // Add job priorities
], // Legacy DB removal
[], // Add job priorities
[], // Fix thread FTS
[
_005_AddSnodeReveivedMessageInfoPrimaryKey.self
]
]
)
}

View File

@ -40,7 +40,7 @@ enum _001_InitialSetupMigration: Migration {
}
try db.create(table: SnodeReceivedMessageInfo.self) { t in
t.column(.id, .integer)
t.deprecatedColumn(name: "id", .integer) // stringlint:disable
.notNull()
.primaryKey(autoincrement: true)
t.column(.key, .text)

View File

@ -0,0 +1,72 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import SessionUtilitiesKit
enum _005_AddSnodeReveivedMessageInfoPrimaryKey: Migration {
static let target: TargetMigrations.Identifier = .snodeKit
static let identifier: String = "AddSnodeReveivedMessageInfoPrimaryKey" // stringlint:disable
static let needsConfigSync: Bool = false
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
/// This migration adds a flat to the `SnodeReceivedMessageInfo` so that when deleting interactions we can
/// ignore their hashes when subsequently trying to fetch new messages (which results in the storage server returning
/// messages from the beginning of time)
static let minExpectedRunDuration: TimeInterval = 0.2
static func migrate(_ db: Database) throws {
// SQLite doesn't support adding a new primary key after creation so we need to create a new table with
// the setup we want, copy data from the old table over, drop the old table and rename the new table
struct TmpSnodeReceivedMessageInfo: Codable, TableRecord, FetchableRecord, PersistableRecord, ColumnExpressible {
static var databaseTableName: String { "tmpSnodeReceivedMessageInfo" }
typealias Columns = CodingKeys
enum CodingKeys: String, CodingKey, ColumnExpression {
case key
case hash
case expirationDateMs
case wasDeletedOrInvalid
}
let key: String
let hash: String
let expirationDateMs: Int64
var wasDeletedOrInvalid: Bool?
}
try db.create(table: TmpSnodeReceivedMessageInfo.self) { t in
t.column(.key, .text).notNull()
t.column(.hash, .text).notNull()
t.column(.expirationDateMs, .integer).notNull()
t.column(.wasDeletedOrInvalid, .boolean)
t.primaryKey([.key, .hash])
}
// Insert into the new table, drop the old table and rename the new table to be the old one
let tmpInfo: TypedTableAlias<TmpSnodeReceivedMessageInfo> = TypedTableAlias()
let info: TypedTableAlias<SnodeReceivedMessageInfo> = TypedTableAlias()
try db.execute(literal: """
INSERT INTO \(tmpInfo)
SELECT \(info[.key]), \(info[.hash]), \(info[.expirationDateMs]), \(info[.wasDeletedOrInvalid])
FROM \(info)
""")
try db.drop(table: SnodeReceivedMessageInfo.self)
try db.rename(
table: TmpSnodeReceivedMessageInfo.databaseTableName,
to: SnodeReceivedMessageInfo.databaseTableName
)
// Need to create the indexes separately from creating 'TmpGroupMember' to ensure they
// have the correct names
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.key])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.hash])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.expirationDateMs])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.wasDeletedOrInvalid])
Storage.update(progress: 1, for: self, in: target)
}
}

View File

@ -9,17 +9,12 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist
public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression {
case id
case key
case hash
case expirationDateMs
case wasDeletedOrInvalid
}
/// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into
/// the database yet this value will be `nil`
public var id: Int64? = nil
/// The key this message hash is associated to
///
/// This will be a combination of {address}.{port}.{publicKey} for new rows and just the {publicKey} for legacy rows
@ -41,12 +36,6 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist
///
/// **Note:** When retrieving the `lastNotExpired` we will ignore any entries where this flag is true
public var wasDeletedOrInvalid: Bool?
// MARK: - Custom Database Interaction
public mutating func didInsert(_ inserted: InsertionSuccess) {
self.id = inserted.rowID
}
}
// MARK: - Convenience
@ -133,7 +122,7 @@ public extension SnodeReceivedMessageInfo {
)
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs > SnodeAPI.currentOffsetTimestampMs())
.order(SnodeReceivedMessageInfo.Columns.id.desc)
.order(Column.rowID.desc)
.fetchOne(db)
// If we have a non-legacy hash then return it immediately (legacy hashes had a different
@ -146,7 +135,7 @@ public extension SnodeReceivedMessageInfo {
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid == false
)
.filter(SnodeReceivedMessageInfo.Columns.key == publicKey)
.order(SnodeReceivedMessageInfo.Columns.id.desc)
.order(Column.rowID.desc)
.fetchOne(db)
}
}

View File

@ -1,4 +1,6 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
//
// stringlint:disable
import Foundation
@ -40,12 +42,9 @@ public extension SnodeAPI {
public var shouldFetchSinceLastHash: Bool { true }
/// This flag indicates whether we should dedupe messages from the specified namespace, when `true` we will
/// store a `SnodeReceivedMessageInfo` record for the message and check for a matching record whenever
/// we receive a message from this namespace
///
/// **Note:** An additional side-effect of this flag is that when we poll for messages from the specified namespace
/// we will always retrieve **all** messages from the namespace (instead of just new messages since the last one
/// we have seen)
/// attempt to `insert` a `SnodeReceivedMessageInfo` record (which will fail if we had already processed this
/// message previously), when `false` we will still `upsert` a record so we don't run into the unique constraint allowing
/// re-processing of a previously processed message
public var shouldDedupeMessages: Bool {
switch self {
case .`default`, .legacyClosedGroup: return true

View File

@ -13,10 +13,12 @@ public enum SNUIKit: MigratableTarget {
// SNUIKit migrations
[], // Initial DB Creation
[], // YDB to GRDB Migration
[], // YDB Removal
[], // Legacy DB removal
[
_001_ThemePreferences.self
] // Add job priorities
], // Add job priorities
[], // Fix thread FTS
[]
]
)
}

View File

@ -19,12 +19,14 @@ public enum SNUtilitiesKit: MigratableTarget { // Just to make the external API
_001_InitialSetupMigration.self,
_002_SetupStandardJobs.self,
_003_YDBToGRDBMigration.self
],
[], // Other DB migrations
], // Initial DB Creation
[], // YDB to GRDB Migration
[], // Legacy DB removal
[
_004_AddJobPriority.self
]
], // Add job priorities
[], // Fix thread FTS
[]
]
)
}