Cleaned up the outdated message logic and fixed a few bugs

Fixed a bug where showing hidden conversations would appear at the bottom of the list
Fixed a bug where you would be kicked from a one-to-one conversation when opening it if you had hidden it
Fixed a bug where joining a community via URL wasn't automatically opening the community
Fixed a bug where the community poller could trigger again before the previous poll completed
Fixed an edge-case where community messages could be missed if the app crashed at the right time when processing a poll response
This commit is contained in:
Morgan Pretty 2023-06-13 14:40:04 +10:00
parent 44469d9078
commit 1ba060b7f0
14 changed files with 164 additions and 150 deletions

View File

@ -1587,6 +1587,10 @@ final class ConversationVC: BaseVC, SessionUtilRespondingViewController, Convers
self.scrollButton.alpha = self.getScrollButtonOpacity()
self.unreadCountView.alpha = self.scrollButton.alpha
// The initial scroll can trigger this logic but we already mark the initially focused message
// as read so don't run the below until the user actually scrolls after the initial layout
guard self.didFinishInitialLayout else { return }
// We want to mark messages as read while we scroll, so grab the newest message and mark
// everything older as read
//

View File

@ -311,6 +311,19 @@ extension GlobalSearchViewController {
return
}
// If it's a one-to-one thread then make sure the thread exists before pushing to it (in case the
// contact has been hidden)
if threadVariant == .contact {
Storage.shared.write { db in
try SessionThread.fetchOrCreate(
db,
id: threadId,
variant: threadVariant,
shouldBeVisible: nil // Don't change current state
)
}
}
let viewController: ConversationVC = ConversationVC(
threadId: threadId,
threadVariant: threadVariant,

View File

@ -159,10 +159,10 @@ final class JoinOpenGroupVC: BaseVC, UIPageViewControllerDataSource, UIPageViewC
return
}
joinOpenGroup(roomToken: room, server: server, publicKey: publicKey)
joinOpenGroup(roomToken: room, server: server, publicKey: publicKey, shouldOpenCommunity: true)
}
fileprivate func joinOpenGroup(roomToken: String, server: String, publicKey: String, shouldOpenCommunity: Bool = false) {
fileprivate func joinOpenGroup(roomToken: String, server: String, publicKey: String, shouldOpenCommunity: Bool) {
guard !isJoining, let navigationController: UINavigationController = navigationController else { return }
isJoining = true

View File

@ -277,15 +277,9 @@ public extension Message {
return processedMessage
}
catch {
// If we get 'selfSend' or 'duplicateControlMessage' errors then we still want to insert
// the SnodeReceivedMessageInfo to prevent retrieving and attempting to process the same
// message again (as well as ensure the next poll doesn't retrieve the same message)
switch error {
case MessageReceiverError.selfSend, MessageReceiverError.duplicateControlMessage:
_ = try? rawMessage.info.inserted(db)
break
default: break
// For some error cases we want to update the last hash so do so
if (error as? MessageReceiverError)?.shouldUpdateLastHash == true {
_ = try? rawMessage.info.inserted(db)
}
throw error

View File

@ -605,18 +605,19 @@ public final class OpenGroupManager {
return
}
let seqNo: Int64? = messages.map { $0.seqNo }.max()
let sortedMessages: [OpenGroupAPI.Message] = messages
.filter { $0.deleted != true }
.sorted { lhs, rhs in lhs.id < rhs.id }
var messageServerIdsToRemove: [Int64] = messages
var messageServerInfoToRemove: [(id: Int64, seqNo: Int64)] = messages
.filter { $0.deleted == true }
.map { $0.id }
if let seqNo: Int64 = seqNo {
.map { ($0.id, $0.seqNo) }
let updateSeqNo: (Database, String, inout Int64, Int64, OGMDependencies) -> () = { db, openGroupId, lastValidSeqNo, seqNo, dependencies in
// Only update the data if the 'seqNo' is larger than the lastValidSeqNo (only want it to increase)
guard seqNo > lastValidSeqNo else { return }
// Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId')
_ = try? OpenGroup
.filter(id: openGroup.id)
.filter(id: openGroupId)
.updateAll(db, OpenGroup.Columns.sequenceNumber.set(to: seqNo))
// Update pendingChange cache
@ -624,13 +625,18 @@ public final class OpenGroupManager {
$0.pendingChanges = $0.pendingChanges
.filter { $0.seqNo == nil || $0.seqNo! > seqNo }
}
// Update the inout value
lastValidSeqNo = seqNo
}
// Process the messages
var lastValidSeqNo: Int64 = -1
sortedMessages.forEach { message in
if message.base64EncodedData == nil && message.reactions == nil {
messageServerIdsToRemove.append(Int64(message.id))
return
messageServerInfoToRemove.append((message.id, message.seqNo))
return updateSeqNo(db, openGroup.id, &lastValidSeqNo, message.seqNo, dependencies)
}
// Handle messages
@ -657,6 +663,7 @@ public final class OpenGroupManager {
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
dependencies: dependencies
)
updateSeqNo(db, openGroup.id, &lastValidSeqNo, message.seqNo, dependencies)
}
}
catch {
@ -701,6 +708,7 @@ public final class OpenGroupManager {
openGroupMessageServerId: message.id,
openGroupReactions: reactions
)
updateSeqNo(db, openGroup.id, &lastValidSeqNo, message.seqNo, dependencies)
}
catch {
SNLog("Couldn't handle open group reactions due to error: \(error).")
@ -709,12 +717,18 @@ public final class OpenGroupManager {
}
// Handle any deletions that are needed
guard !messageServerIdsToRemove.isEmpty else { return }
guard !messageServerInfoToRemove.isEmpty else { return }
let messageServerIdsToRemove: [Int64] = messageServerInfoToRemove.map { $0.id }
_ = try? Interaction
.filter(Interaction.Columns.threadId == openGroup.threadId)
.filter(messageServerIdsToRemove.contains(Interaction.Columns.openGroupServerMessageId))
.deleteAll(db)
// Update the seqNo for deletions
if let lastDeletionSeqNo: Int64 = messageServerInfoToRemove.map({ $0.seqNo }).max() {
updateSeqNo(db, openGroup.id, &lastValidSeqNo, lastDeletionSeqNo, dependencies)
}
}
internal static func handleDirectMessages(

View File

@ -34,6 +34,19 @@ public enum MessageReceiverError: LocalizedError {
default: return true
}
}
public var shouldUpdateLastHash: Bool {
switch self {
// If we get one of these errors then we still want to update the last hash to prevent
// retrieving and attempting to process the same messages again (as well as ensure the
// next poll doesn't retrieve the same message - these errors are essentially considered
// "already successfully processed")
case .selfSend, .duplicateControlMessage, .outdatedMessage:
return true
default: return false
}
}
public var errorDescription: String? {
switch self {

View File

@ -13,31 +13,8 @@ extension MessageReceiver {
threadVariant: SessionThread.Variant,
message: CallMessage
) throws {
let timestampMs: Int64 = (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs())
// Only support calls from contact threads
guard
threadVariant == .contact,
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
(
SessionThread
.filter(id: threadId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: .contacts,
changeTimestampMs: timestampMs
)
)
else { return }
guard threadVariant == .contact else { return }
switch message.kind {
case .preOffer: try MessageReceiver.handleNewCallMessage(db, message: message)

View File

@ -3,7 +3,6 @@
import Foundation
import GRDB
import SessionSnodeKit
import SessionUtilitiesKit
extension MessageReceiver {
internal static func handleDataExtractionNotification(
@ -12,46 +11,12 @@ extension MessageReceiver {
threadVariant: SessionThread.Variant,
message: DataExtractionNotification
) throws {
let timestampMs: Int64 = (
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
guard
threadVariant == .contact,
let sender: String = message.sender,
let messageKind: DataExtractionNotification.Kind = message.kind
else { throw MessageReceiverError.invalidMessage }
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
guard
SessionThread
.filter(id: threadId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: {
switch threadVariant {
case .contact:
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db)
return (threadId == currentUserPublicKey ? .userProfile : .contacts)
default: return .userGroups
}
}(),
changeTimestampMs: timestampMs
)
else { throw MessageReceiverError.outdatedMessage }
_ = try Interaction(
serverHash: message.serverHash,
threadId: threadId,
@ -62,7 +27,10 @@ extension MessageReceiver {
case .mediaSaved: return .infoMediaSavedNotification
}
}(),
timestampMs: timestampMs
timestampMs: (
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
).inserted(db)
}
}

View File

@ -13,10 +13,6 @@ extension MessageReceiver {
dependencies: SMKDependencies
) throws {
let userPublicKey = getUserHexEncodedPublicKey(db, dependencies: dependencies)
let timestampMs: Int64 = (
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
var blindedContactIds: [String] = []
// Ignore messages which were sent from the current user
@ -25,26 +21,6 @@ extension MessageReceiver {
let senderId: String = message.sender
else { throw MessageReceiverError.invalidMessage }
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
guard
SessionThread
.filter(id: senderId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: senderId,
threadVariant: .contact,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: senderId,
targetConfig: .contacts,
changeTimestampMs: timestampMs
)
else { throw MessageReceiverError.outdatedMessage }
// Update profile if needed (want to do this regardless of whether the message exists or
// not to ensure the profile info gets sync between a users devices at every chance)
if let profile = message.profile {
@ -160,7 +136,10 @@ extension MessageReceiver {
threadId: unblindedThread.id,
authorId: senderId,
variant: .infoMessageRequestAccepted,
timestampMs: timestampMs
timestampMs: (
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
).inserted(db)
}

View File

@ -23,32 +23,6 @@ extension MessageReceiver {
// seconds to maintain the accuracy)
let messageSentTimestamp: TimeInterval = (TimeInterval(message.sentTimestamp ?? 0) / 1000)
let isMainAppActive: Bool = (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false)
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies)
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
guard
SessionThread
.filter(id: threadId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: {
switch threadVariant {
case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts)
default: return .userGroups
}
}(),
changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs())
)
else { throw MessageReceiverError.outdatedMessage }
// Update profile if needed (want to do this regardless of whether the message exists or
// not to ensure the profile info gets sync between a users devices at every chance)
@ -90,6 +64,7 @@ extension MessageReceiver {
}
// Store the message variant so we can run variant-specific behaviours
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies)
let thread: SessionThread = try SessionThread
.fetchOrCreate(db, id: threadId, variant: threadVariant, shouldBeVisible: nil)
let maybeOpenGroup: OpenGroup? = {

View File

@ -192,6 +192,15 @@ public enum MessageReceiver {
SessionUtil.conversationInConfig(db, threadId: threadId, threadVariant: threadVariant, visibleOnly: false)
else { throw MessageReceiverError.requiredThreadNotInConfig }
// Throw if the message is outdated and shouldn't be processed
try throwIfMessageOutdated(
db,
message: message,
threadId: threadId,
threadVariant: threadVariant,
dependencies: dependencies
)
switch message {
case let message as ReadReceipt:
try MessageReceiver.handleReadReceipt(
@ -315,7 +324,8 @@ public enum MessageReceiver {
.filter(id: threadId)
.updateAllAndConfig(
db,
SessionThread.Columns.shouldBeVisible.set(to: true)
SessionThread.Columns.shouldBeVisible.set(to: true),
SessionThread.Columns.pinnedPriority.set(to: SessionUtil.visiblePriority)
)
}
}
@ -344,4 +354,44 @@ public enum MessageReceiver {
try reaction.with(interactionId: interactionId).insert(db)
}
}
public static func throwIfMessageOutdated(
_ db: Database,
message: Message,
threadId: String,
threadVariant: SessionThread.Variant,
dependencies: SMKDependencies = SMKDependencies()
) throws {
switch message {
case is ReadReceipt: return // No visible artifact created so better to keep for more reliable read states
case is UnsendRequest: return // We should always process the removal of messages just in case
default: break
}
// Determine the state of the conversation and the validity of the message
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies)
let conversationVisibleInConfig: Bool = SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
)
let canPerformChange: Bool = SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: {
switch threadVariant {
case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts)
default: return .userGroups
}
}(),
changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs())
)
// If the thread is visible or the message was sent more recently than the last config message (minus
// buffer period) then we should process the message, if not then throw as the message is outdated
guard !conversationVisibleInConfig && !canPerformChange else { return }
throw MessageReceiverError.outdatedMessage
}
}

View File

@ -53,16 +53,32 @@ extension OpenGroupAPI {
.fetchOne(db)
}
.defaulting(to: 0)
let lastPollStart: TimeInterval = Date().timeIntervalSince1970
let nextPollInterval: TimeInterval = getInterval(for: minPollFailureCount, minInterval: Poller.minPollInterval, maxInterval: Poller.maxPollInterval)
poll(using: dependencies).sinkUntilComplete()
timer = Timer.scheduledTimerOnMainThread(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in
timer.invalidate()
Threading.pollerQueue.async {
self?.pollRecursively(using: dependencies)
}
}
// Wait until the last poll completes before polling again ensuring we don't poll any faster than
// the 'nextPollInterval' value
poll(using: dependencies)
.sinkUntilComplete(
receiveCompletion: { [weak self] _ in
let currentTime: TimeInterval = Date().timeIntervalSince1970
let remainingInterval: TimeInterval = max(0, nextPollInterval - (currentTime - lastPollStart))
guard remainingInterval > 0 else {
return Threading.pollerQueue.async {
self?.pollRecursively(using: dependencies)
}
}
self?.timer = Timer.scheduledTimerOnMainThread(withTimeInterval: remainingInterval, repeats: false) { timer in
timer.invalidate()
Threading.pollerQueue.async {
self?.pollRecursively(using: dependencies)
}
}
}
)
}
public func poll(

View File

@ -34,11 +34,14 @@ internal extension SessionUtil {
return !allColumnsThatTriggerConfigUpdate.isDisjoint(with: targetColumns)
}
/// A `0` `priority` value indicates visible, but not pinned
static let visiblePriority: Int32 = 0
/// A negative `priority` value indicates hidden
static let hiddenPriority: Int32 = -1
static func shouldBeVisible(priority: Int32) -> Bool {
return (priority >= 0)
return (priority >= SessionUtil.visiblePriority)
}
static func performAndPushChange(
@ -127,8 +130,8 @@ internal extension SessionUtil {
guard noteToSelf.shouldBeVisible else { return SessionUtil.hiddenPriority }
return noteToSelf.pinnedPriority
.map { Int32($0 == 0 ? 0 : max($0, 1)) }
.defaulting(to: 0)
.map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) }
.defaulting(to: SessionUtil.visiblePriority)
}(),
in: conf
)
@ -154,8 +157,8 @@ internal extension SessionUtil {
guard thread.shouldBeVisible else { return SessionUtil.hiddenPriority }
return thread.pinnedPriority
.map { Int32($0 == 0 ? 0 : max($0, 1)) }
.defaulting(to: 0)
.map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) }
.defaulting(to: SessionUtil.visiblePriority)
}()
)
},
@ -176,8 +179,8 @@ internal extension SessionUtil {
CommunityInfo(
urlInfo: urlInfo,
priority: thread.pinnedPriority
.map { Int32($0 == 0 ? 0 : max($0, 1)) }
.defaulting(to: 0)
.map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) }
.defaulting(to: SessionUtil.visiblePriority)
)
}
},
@ -197,8 +200,8 @@ internal extension SessionUtil {
LegacyGroupInfo(
id: thread.id,
priority: thread.pinnedPriority
.map { Int32($0 == 0 ? 0 : max($0, 1)) }
.defaulting(to: 0)
.map { Int32($0 == 0 ? SessionUtil.visiblePriority : max($0, 1)) }
.defaulting(to: SessionUtil.visiblePriority)
)
},
in: conf

View File

@ -75,6 +75,14 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension
return
}
// Throw if the message is outdated and shouldn't be processed
try MessageReceiver.throwIfMessageOutdated(
db,
message: processedMessage.messageInfo.message,
threadId: processedMessage.threadId,
threadVariant: processedMessage.threadVariant
)
switch processedMessage.messageInfo.message {
case let visibleMessage as VisibleMessage:
let interactionId: Int64 = try MessageReceiver.handleVisibleMessage(
@ -174,7 +182,7 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension
catch {
if let error = error as? MessageReceiverError, error.isRetryable {
switch error {
case .invalidGroupPublicKey, .noGroupKeyPair: self.completeSilenty()
case .invalidGroupPublicKey, .noGroupKeyPair, .outdatedMessage: self.completeSilenty()
default: self.handleFailure(for: notificationContent)
}
}