Fixed a couple of bugs and made some performance tweaks

Cleaned up some duplicate poller logic (avoid going back to the main queue)
Updated the code to remove a profile image if a user sends a message which doesn't have a profile image (ie. they've explicitly removed it)
Fixed an issue where some more logic could incorrectly run in the DBWrite queue
Fixed a bug where the OpenGroupPoller could stop polling when getting an error
Fixed a bug where messages which had the same timestamp wouldn't get correctly marked as read when scrolling under the right circumstances
This commit is contained in:
Morgan Pretty 2023-07-10 17:56:58 +10:00
parent a5306f85b7
commit 38420997b0
10 changed files with 218 additions and 266 deletions

View File

@ -6599,7 +6599,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 417;
CURRENT_PROJECT_VERSION = 418;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = "$(inherited)";
@ -6671,7 +6671,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 417;
CURRENT_PROJECT_VERSION = 418;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7;
ENABLE_NS_ASSERTIONS = NO;
@ -6736,7 +6736,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 417;
CURRENT_PROJECT_VERSION = 418;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = "$(inherited)";
@ -6810,7 +6810,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 417;
CURRENT_PROJECT_VERSION = 418;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7;
ENABLE_NS_ASSERTIONS = NO;
@ -7718,7 +7718,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 417;
CURRENT_PROJECT_VERSION = 418;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = (
"$(inherited)",
@ -7789,7 +7789,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 417;
CURRENT_PROJECT_VERSION = 418;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = (
"$(inherited)",

View File

@ -213,6 +213,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
// MARK: - Interaction Data
private var lastInteractionIdMarkedAsRead: Int64? = nil
private var lastInteractionTimestampMsMarkedAsRead: Int64 = 0
public private(set) var unobservedInteractionDataChanges: ([SectionModel], StagedChangeset<[SectionModel]>)?
public private(set) var interactionData: [SectionModel] = []
@ -651,8 +652,8 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
/// Since this method now gets triggered when scrolling we want to try to optimise it and avoid busying the database
/// write queue when it isn't needed, in order to do this we:
/// - Throttle the updates to 100ms (quick enough that users shouldn't notice, but will help the DB when the user flings the list)
/// - Don't bother marking anything as read if this was called with the same `interactionId` that we previously marked as
/// read (ie. when scrolling and the last message hasn't changed)
/// - Only mark interactions as read if they have newer `timestampMs` or `id` values (ie. were sent later or were more-recent
/// entries in the database), **Note:** Old messages will be marked as read upon insertion so shouldn't be an issue
///
/// The `ThreadViewModel.markAsRead` method also tries to avoid marking as read if a conversation is already fully read
if markAsReadPublisher == nil {
@ -662,10 +663,11 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
receiveOutput: { [weak self] target, timestampMs in
switch target {
case .thread: self?.threadData.markAsRead(target: target)
case .threadAndInteractions:
case .threadAndInteractions(let interactionId):
guard
timestampMs == nil ||
(self?.lastInteractionTimestampMsMarkedAsRead ?? 0) < (timestampMs ?? 0)
(self?.lastInteractionTimestampMsMarkedAsRead ?? 0) < (timestampMs ?? 0) ||
(self?.lastInteractionIdMarkedAsRead ?? 0) < (interactionId ?? 0)
else {
self?.threadData.markAsRead(target: .thread)
return
@ -677,6 +679,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
self?.lastInteractionTimestampMsMarkedAsRead = timestampMs
}
self?.lastInteractionIdMarkedAsRead = (interactionId ?? self?.threadData.interactionId)
self?.threadData.markAsRead(target: target)
}
}

View File

@ -521,61 +521,65 @@ public final class OpenGroupManager {
}
}
db.afterNextTransactionNested { db in
// Start the poller if needed
if dependencies.cache.pollers[server.lowercased()] == nil {
dependencies.mutableCache.mutate {
$0.pollers[server.lowercased()]?.stop()
$0.pollers[server.lowercased()] = OpenGroupAPI.Poller(for: server.lowercased())
db.afterNextTransactionNested { _ in
// Dispatch async to the workQueue to prevent holding up the DBWrite thread from the
// above transaction
OpenGroupAPI.workQueue.async {
// Start the poller if needed
if dependencies.cache.pollers[server.lowercased()] == nil {
dependencies.mutableCache.mutate {
$0.pollers[server.lowercased()]?.stop()
$0.pollers[server.lowercased()] = OpenGroupAPI.Poller(for: server.lowercased())
}
dependencies.cache.pollers[server.lowercased()]?.startIfNeeded(using: dependencies)
}
dependencies.cache.pollers[server.lowercased()]?.startIfNeeded(using: dependencies)
}
/// Start downloading the room image (if we don't have one or it's been updated)
if
let imageId: String = (pollInfo.details?.imageId ?? openGroup.imageId),
(
openGroup.imageData == nil ||
openGroup.imageId != imageId
)
{
OpenGroupManager
.roomImage(
fileId: imageId,
for: roomToken,
on: server,
existingData: openGroup.imageData,
using: dependencies
/// Start downloading the room image (if we don't have one or it's been updated)
if
let imageId: String = (pollInfo.details?.imageId ?? openGroup.imageId),
(
openGroup.imageData == nil ||
openGroup.imageId != imageId
)
// Note: We need to subscribe and receive on different threads to ensure the
// logic in 'receiveValue' doesn't result in a reentrancy database issue
.subscribe(on: OpenGroupAPI.workQueue)
.receive(on: DispatchQueue.global(qos: .default))
.sinkUntilComplete(
receiveCompletion: { _ in
if waitForImageToComplete {
completion?()
{
OpenGroupManager
.roomImage(
fileId: imageId,
for: roomToken,
on: server,
existingData: openGroup.imageData,
using: dependencies
)
// Note: We need to subscribe and receive on different threads to ensure the
// logic in 'receiveValue' doesn't result in a reentrancy database issue
.subscribe(on: OpenGroupAPI.workQueue)
.receive(on: DispatchQueue.global(qos: .default))
.sinkUntilComplete(
receiveCompletion: { _ in
if waitForImageToComplete {
completion?()
}
},
receiveValue: { data in
dependencies.storage.write { db in
_ = try OpenGroup
.filter(id: threadId)
.updateAll(db, OpenGroup.Columns.imageData.set(to: data))
}
}
},
receiveValue: { data in
dependencies.storage.write { db in
_ = try OpenGroup
.filter(id: threadId)
.updateAll(db, OpenGroup.Columns.imageData.set(to: data))
}
}
)
}
else if waitForImageToComplete {
)
}
else if waitForImageToComplete {
completion?()
}
// If we want to wait for the image to complete then don't call the completion here
guard !waitForImageToComplete else { return }
// Finish
completion?()
}
// If we want to wait for the image to complete then don't call the completion here
guard !waitForImageToComplete else { return }
// Finish
completion?()
}
}

View File

@ -35,7 +35,7 @@ extension MessageReceiver {
guard
let profilePictureUrl: String = profile.profilePictureUrl,
let profileKey: Data = profile.profileKey
else { return .none }
else { return .remove }
return .updateTo(
url: profilePictureUrl,

View File

@ -14,7 +14,7 @@ public final class ClosedGroupPoller: Poller {
override var namespaces: [SnodeAPI.Namespace] { ClosedGroupPoller.namespaces }
override var maxNodePollCount: UInt { 0 }
private static let minPollInterval: Double = 2
private static let minPollInterval: Double = 3
private static let maxPollInterval: Double = 30
// MARK: - Initialization
@ -78,30 +78,12 @@ public final class ClosedGroupPoller: Poller {
return nextPollInterval
}
override func getSnodeForPolling(
for publicKey: String
) -> AnyPublisher<Snode, Error> {
return SnodeAPI.getSwarm(for: publicKey)
.tryMap { swarm -> Snode in
guard let snode: Snode = swarm.randomElement() else {
throw OnionRequestAPIError.insufficientSnodes
}
return snode
}
.eraseToAnyPublisher()
}
override func handlePollError(
_ error: Error,
for publicKey: String,
using dependencies: SMKDependencies = SMKDependencies()
) {
) -> Bool {
SNLog("Polling failed for closed group with public key: \(publicKey) due to error: \(error).")
// Try to restart the poller from scratch
Threading.pollerQueue.async { [weak self] in
self?.setUpPolling(for: publicKey, using: dependencies)
}
return true
}
}

View File

@ -11,9 +11,6 @@ public final class CurrentUserPoller: Poller {
public static var namespaces: [SnodeAPI.Namespace] = [
.default, .configUserProfile, .configContacts, .configConvoInfoVolatile, .configUserGroups
]
private var targetSnode: Atomic<Snode?> = Atomic(nil)
private var usedSnodes: Atomic<Set<Snode>> = Atomic([])
// MARK: - Settings
@ -63,53 +60,16 @@ public final class CurrentUserPoller: Poller {
return min(maxRetryInterval, nextDelay)
}
override func getSnodeForPolling(
for publicKey: String
) -> AnyPublisher<Snode, Error> {
if let targetSnode: Snode = self.targetSnode.wrappedValue {
return Just(targetSnode)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
// Used the cached swarm for the given key and update the list of unusedSnodes
let swarm: Set<Snode> = (SnodeAPI.swarmCache.wrappedValue[publicKey] ?? [])
let unusedSnodes: Set<Snode> = swarm.subtracting(usedSnodes.wrappedValue)
// randomElement() uses the system's default random generator, which is cryptographically secure
if let nextSnode: Snode = unusedSnodes.randomElement() {
self.targetSnode.mutate { $0 = nextSnode }
self.usedSnodes.mutate { $0.insert(nextSnode) }
return Just(nextSnode)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
// If we haven't retrieved a target snode at this point then either the cache
// is empty or we have used all of the snodes and need to start from scratch
return SnodeAPI.getSwarm(for: publicKey)
.tryFlatMap { [weak self] _ -> AnyPublisher<Snode, Error> in
guard let strongSelf = self else { throw SnodeAPIError.generic }
self?.targetSnode.mutate { $0 = nil }
self?.usedSnodes.mutate { $0.removeAll() }
return strongSelf.getSnodeForPolling(for: publicKey)
}
.eraseToAnyPublisher()
}
override func handlePollError(
_ error: Error,
for publicKey: String,
using dependencies: SMKDependencies = SMKDependencies()
) {
) -> Bool {
if UserDefaults.sharedLokiProject?[.isMainAppActive] != true {
// Do nothing when an error gets throws right after returning from the background (happens frequently)
}
else if let targetSnode: Snode = targetSnode.wrappedValue {
SNLog("Polling \(targetSnode) failed; dropping it and switching to next snode.")
SNLog("Main Poller polling \(targetSnode) failed; dropping it and switching to next snode.")
self.targetSnode.mutate { $0 = nil }
SnodeAPI.dropSnodeFromSwarmIfNeeded(targetSnode, publicKey: publicKey)
}
@ -117,9 +77,6 @@ public final class CurrentUserPoller: Poller {
SNLog("Polling failed due to having no target service node.")
}
// Try to restart the poller from scratch
Threading.pollerQueue.async { [weak self] in
self?.setUpPolling(for: publicKey, using: dependencies)
}
return true
}
}

View File

@ -57,49 +57,42 @@ extension OpenGroupAPI {
) {
guard hasStarted else { return }
dependencies.storage
.readPublisher { [server = server] db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.select(min(OpenGroup.Columns.pollFailureCount))
.asRequest(of: TimeInterval.self)
.fetchOne(db)
}
.tryFlatMap { [weak self] minPollFailureCount -> AnyPublisher<(TimeInterval, TimeInterval), Error> in
guard let strongSelf = self else { throw OpenGroupAPIError.invalidPoll }
let lastPollStart: TimeInterval = Date().timeIntervalSince1970
let nextPollInterval: TimeInterval = Poller.getInterval(
for: (minPollFailureCount ?? 0),
minInterval: Poller.minPollInterval,
maxInterval: Poller.maxPollInterval
)
// Wait until the last poll completes before polling again ensuring we don't poll any faster than
// the 'nextPollInterval' value
return strongSelf.poll(using: dependencies)
.map { _ in (lastPollStart, nextPollInterval) }
.eraseToAnyPublisher()
}
let server: String = self.server
let lastPollStart: TimeInterval = Date().timeIntervalSince1970
poll(using: dependencies)
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sinkUntilComplete(
receiveValue: { [weak self] lastPollStart, nextPollInterval in
receiveCompletion: { [weak self] _ in
let minPollFailureCount: Int64 = dependencies.storage
.read { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.select(min(OpenGroup.Columns.pollFailureCount))
.asRequest(of: Int64.self)
.fetchOne(db)
}
.defaulting(to: 0)
// Calculate the remaining poll delay
let currentTime: TimeInterval = Date().timeIntervalSince1970
let nextPollInterval: TimeInterval = Poller.getInterval(
for: TimeInterval(minPollFailureCount),
minInterval: Poller.minPollInterval,
maxInterval: Poller.maxPollInterval
)
let remainingInterval: TimeInterval = max(0, nextPollInterval - (currentTime - lastPollStart))
// Schedule the next poll
guard remainingInterval > 0 else {
return dependencies.subscribeQueue.async {
self?.pollRecursively(using: dependencies)
}
}
self?.timer = Timer.scheduledTimerOnMainThread(withTimeInterval: remainingInterval, repeats: false) { timer in
timer.invalidate()
dependencies.subscribeQueue.async {
self?.pollRecursively(using: dependencies)
}
dependencies.subscribeQueue.asyncAfter(deadline: .now() + .milliseconds(Int(remainingInterval * 1000)), qos: .default) {
self?.pollRecursively(using: dependencies)
}
}
)
@ -227,7 +220,7 @@ extension OpenGroupAPI {
.defaulting(to: 0)
var prunedIds: [String] = []
Storage.shared.writeAsync { db in
dependencies.storage.writeAsync { db in
struct Info: Decodable, FetchableRecord {
let id: String
let shouldBeVisible: Bool

View File

@ -8,11 +8,14 @@ import SessionSnodeKit
import SessionUtilitiesKit
public class Poller {
private var timers: Atomic<[String: Timer]> = Atomic([:])
private var cancellables: Atomic<[String: AnyCancellable]> = Atomic([:])
internal var isPolling: Atomic<[String: Bool]> = Atomic([:])
internal var pollCount: Atomic<[String: Int]> = Atomic([:])
internal var failureCount: Atomic<[String: Int]> = Atomic([:])
internal var targetSnode: Atomic<Snode?> = Atomic(nil)
private var usedSnodes: Atomic<Set<Snode>> = Atomic([])
// MARK: - Settings
/// The namespaces which this poller queries
@ -20,7 +23,7 @@ public class Poller {
preconditionFailure("abstract class - override in subclass")
}
/// The number of times the poller can poll before swapping to a new snode
/// The number of times the poller can poll a single snode before swapping to a new snode
internal var maxNodePollCount: UInt {
preconditionFailure("abstract class - override in subclass")
}
@ -39,7 +42,7 @@ public class Poller {
public func stopPolling(for publicKey: String) {
isPolling.mutate { $0[publicKey] = false }
timers.mutate { $0[publicKey]?.invalidate() }
cancellables.mutate { $0[publicKey]?.cancel() }
}
// MARK: - Abstract Methods
@ -49,17 +52,13 @@ public class Poller {
preconditionFailure("abstract class - override in subclass")
}
/// Calculate the delay which should occur before the next poll
internal func nextPollDelay(for publicKey: String) -> TimeInterval {
preconditionFailure("abstract class - override in subclass")
}
internal func getSnodeForPolling(
for publicKey: String
) -> AnyPublisher<Snode, Error> {
preconditionFailure("abstract class - override in subclass")
}
internal func handlePollError(_ error: Error, for publicKey: String, using dependencies: SMKDependencies) {
/// Perform and logic which should occur when the poll errors, will stop polling if `false` is returned
internal func handlePollError(_ error: Error, for publicKey: String, using dependencies: SMKDependencies) -> Bool {
preconditionFailure("abstract class - override in subclass")
}
@ -75,48 +74,65 @@ public class Poller {
// and the timer is not created, if we mark the group as is polling
// after setUpPolling. So the poller may not work, thus misses messages
self?.isPolling.mutate { $0[publicKey] = true }
self?.setUpPolling(for: publicKey)
self?.pollRecursively(for: publicKey)
}
}
/// We want to initially trigger a poll against the target service node and then run the recursive polling,
/// if an error is thrown during the poll then this should automatically restart the polling
internal func setUpPolling(
internal func getSnodeForPolling(
for publicKey: String,
using dependencies: SMKDependencies = SMKDependencies(
subscribeQueue: Threading.pollerQueue,
receiveQueue: Threading.pollerQueue
)
) {
guard isPolling.wrappedValue[publicKey] == true else { return }
let namespaces: [SnodeAPI.Namespace] = self.namespaces
getSnodeForPolling(for: publicKey)
.flatMap { snode -> AnyPublisher<[Message], Error> in
Poller.poll(
namespaces: namespaces,
from: snode,
for: publicKey,
poller: self,
using: dependencies
)
}
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sinkUntilComplete(
receiveCompletion: { [weak self] result in
switch result {
case .finished: self?.pollRecursively(for: publicKey, using: dependencies)
case .failure(let error):
guard self?.isPolling.wrappedValue[publicKey] == true else { return }
self?.handlePollError(error, for: publicKey, using: dependencies)
}
using dependencies: SMKDependencies = SMKDependencies()
) -> AnyPublisher<Snode, Error> {
// If we don't want to poll a snode multiple times then just grab a random one from the swarm
guard maxNodePollCount > 0 else {
return SnodeAPI.getSwarm(for: publicKey, using: dependencies)
.tryMap { swarm -> Snode in
try swarm.randomElement() ?? { throw OnionRequestAPIError.insufficientSnodes }()
}
)
.eraseToAnyPublisher()
}
// If we already have a target snode then use that
if let targetSnode: Snode = self.targetSnode.wrappedValue {
return Just(targetSnode)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
// Select the next unused snode from the swarm (if we've used them all then clear the used list and
// start cycling through them again)
return SnodeAPI.getSwarm(for: publicKey, using: dependencies)
.tryMap { [usedSnodes = self.usedSnodes, targetSnode = self.targetSnode] swarm -> Snode in
let unusedSnodes: Set<Snode> = swarm.subtracting(usedSnodes.wrappedValue)
// If we've used all of the SNodes then clear out the used list
if unusedSnodes.isEmpty {
usedSnodes.mutate { $0.removeAll() }
}
// Select the next SNode
let nextSnode: Snode = try swarm.randomElement() ?? { throw OnionRequestAPIError.insufficientSnodes }()
targetSnode.mutate { $0 = nextSnode }
usedSnodes.mutate { $0.insert(nextSnode) }
return nextSnode
}
.eraseToAnyPublisher()
}
internal func incrementPollCount(publicKey: String) {
guard maxNodePollCount > 0 else { return }
let pollCount: Int = (self.pollCount.wrappedValue[publicKey] ?? 0)
self.pollCount.mutate { $0[publicKey] = (pollCount + 1) }
// Check if we've polled the serice node too many times
guard pollCount > maxNodePollCount else { return }
// If we have polled this service node more than the maximum allowed then clear out
// the 'targetServiceNode' value
self.targetSnode.mutate { $0 = nil }
}
private func pollRecursively(
for publicKey: String,
using dependencies: SMKDependencies = SMKDependencies()
@ -124,65 +140,60 @@ public class Poller {
guard isPolling.wrappedValue[publicKey] == true else { return }
let namespaces: [SnodeAPI.Namespace] = self.namespaces
let nextPollInterval: TimeInterval = nextPollDelay(for: publicKey)
let lastPollStart: TimeInterval = Date().timeIntervalSince1970
let lastPollInterval: TimeInterval = nextPollDelay(for: publicKey)
let getSnodePublisher: AnyPublisher<Snode, Error> = getSnodeForPolling(for: publicKey)
timers.mutate {
$0[publicKey] = Timer.scheduledTimerOnMainThread(
withTimeInterval: nextPollInterval,
repeats: false
) { [weak self] timer in
timer.invalidate()
self?.getSnodeForPolling(for: publicKey)
.flatMap { snode -> AnyPublisher<[Message], Error> in
Poller.poll(
namespaces: namespaces,
from: snode,
for: publicKey,
poller: self,
using: dependencies
// Store the publisher intp the cancellables dictionary
cancellables.mutate { [weak self] cancellables in
cancellables[publicKey] = getSnodePublisher
.flatMap { snode -> AnyPublisher<[Message], Error> in
Poller.poll(
namespaces: namespaces,
from: snode,
for: publicKey,
poller: self,
using: dependencies
)
}
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sink(
receiveCompletion: { result in
switch result {
case .failure(let error):
// Determine if the error should stop us from polling anymore
guard self?.handlePollError(error, for: publicKey, using: dependencies) == true else {
return
}
case .finished: break
}
// Increment the poll count
self?.incrementPollCount(publicKey: publicKey)
// Calculate the remaining poll delay
let currentTime: TimeInterval = Date().timeIntervalSince1970
let nextPollInterval: TimeInterval = (
self?.nextPollDelay(for: publicKey) ??
lastPollInterval
)
}
.subscribe(on: dependencies.subscribeQueue)
.receive(on: dependencies.receiveQueue)
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .failure(let error): self?.handlePollError(error, for: publicKey, using: dependencies)
case .finished:
let maxNodePollCount: UInt = (self?.maxNodePollCount ?? 0)
// If we have polled this service node more than the
// maximum allowed then throw an error so the parent
// loop can restart the polling
if maxNodePollCount > 0 {
let pollCount: Int = (self?.pollCount.wrappedValue[publicKey] ?? 0)
self?.pollCount.mutate { $0[publicKey] = (pollCount + 1) }
guard pollCount < maxNodePollCount else {
let newSnodeNextPollInterval: TimeInterval = (self?.nextPollDelay(for: publicKey) ?? nextPollInterval)
self?.timers.mutate {
$0[publicKey] = Timer.scheduledTimerOnMainThread(
withTimeInterval: newSnodeNextPollInterval,
repeats: false
) { [weak self] timer in
timer.invalidate()
self?.pollCount.mutate { $0[publicKey] = 0 }
self?.setUpPolling(for: publicKey, using: dependencies)
}
}
return
}
}
// Otherwise just loop
self?.pollRecursively(for: publicKey, using: dependencies)
let remainingInterval: TimeInterval = max(0, nextPollInterval - (currentTime - lastPollStart))
// Schedule the next poll
guard remainingInterval > 0 else {
return dependencies.subscribeQueue.async {
self?.pollRecursively(for: publicKey, using: dependencies)
}
}
)
}
dependencies.subscribeQueue.asyncAfter(deadline: .now() + .milliseconds(Int(remainingInterval * 1000)), qos: .default) {
self?.pollRecursively(for: publicKey, using: dependencies)
}
},
receiveValue: { _ in }
)
}
}
@ -199,6 +210,7 @@ public class Poller {
isBackgroundPollValid: @escaping (() -> Bool) = { true },
poller: Poller? = nil,
using dependencies: SMKDependencies = SMKDependencies(
subscribeQueue: Threading.pollerQueue,
receiveQueue: Threading.pollerQueue
)
) -> AnyPublisher<[Message], Error> {

View File

@ -80,7 +80,8 @@ internal extension SessionUtil {
try Interaction
.filter(
Interaction.Columns.threadId == threadId &&
Interaction.Columns.timestampMs <= lastReadTimestampMs
Interaction.Columns.timestampMs <= lastReadTimestampMs &&
Interaction.Columns.wasRead == false
)
.updateAll( // Handling a config update so don't use `updateAllAndConfig`
db,

View File

@ -158,6 +158,9 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
/// to avoid blocking the DBWrite thread we dispatch to a serial `commitProcessingQueue` to process the incoming changes (in the past not doing
/// so was resulting in hanging when there was a lot of activity happening)
public func databaseDidCommit(_ db: Database) {
// If there were no pending changes in the commit then do nothing
guard !self.changesInCommit.wrappedValue.isEmpty else { return }
// Since we can't be sure the behaviours of 'databaseDidChange' and 'databaseDidCommit' won't change in
// the future we extract and clear the values in 'changesInCommit' since it's 'Atomic<T>' so will different
// threads modifying the data resulting in us missing a change
@ -174,9 +177,6 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
}
private func processDatabaseCommit(committedChanges: Set<PagedData.TrackedChange>) {
// Do nothing when there are no changes
guard !committedChanges.isEmpty else { return }
typealias AssociatedDataInfo = [(hasChanges: Bool, data: ErasedAssociatedRecord)]
typealias UpdatedData = (cache: DataCache<T>, pageInfo: PagedData.PageInfo, hasChanges: Bool, associatedData: AssociatedDataInfo)