Fixed a few of issues in the last commit

Fixed a couple of build issues where I missed a couple of calls to removed functions
Fixed a EXC_BAD_ACCESS issue where the 'poll' function could be called from multiple threads (which accesses and mutates variables)
Cleaned up the MessageRequestResponse handling a little
This commit is contained in:
Morgan Pretty 2022-03-01 17:30:35 +11:00
parent a26ee12f8d
commit 3a75639285
5 changed files with 103 additions and 46 deletions

View file

@ -15,19 +15,19 @@ public final class OpenGroupAPI: NSObject {
// MARK: - Polling State // MARK: - Polling State
private static var hasPerformedInitialPoll: [String: Bool] = [:] private static var hasPerformedInitialPoll: AtomicDict<String, Bool> = AtomicDict()
private static var timeSinceLastPoll: [String: TimeInterval] = [:] private static var timeSinceLastPoll: AtomicDict<String, TimeInterval> = AtomicDict()
private static var lastPollTime: TimeInterval = .greatestFiniteMagnitude private static var lastPollTime: Atomic<TimeInterval> = Atomic(.greatestFiniteMagnitude)
private static let timeSinceLastOpen: TimeInterval = { private static let timeSinceLastOpen: Atomic<TimeInterval> = {
guard let lastOpen = UserDefaults.standard[.lastOpen] else { return .greatestFiniteMagnitude } guard let lastOpen = UserDefaults.standard[.lastOpen] else { return Atomic(.greatestFiniteMagnitude) }
return Date().timeIntervalSince(lastOpen) return Atomic(Date().timeIntervalSince(lastOpen))
}() }()
// TODO: Remove these // TODO: Remove these
private static var legacyAuthTokenPromises: Atomic<[String: Promise<String>]> = Atomic([:]) private static var legacyAuthTokenPromises: AtomicDict<String, Promise<String>> = AtomicDict()
private static var legacyHasUpdatedLastOpenDate = false private static var legacyHasUpdatedLastOpenDate = false
private static var legacyGroupImagePromises: [String: Promise<Data>] = [:] private static var legacyGroupImagePromises: [String: Promise<Data>] = [:]
@ -44,13 +44,13 @@ public final class OpenGroupAPI: NSObject {
public static func poll(_ server: String, using dependencies: Dependencies = Dependencies()) -> Promise<[Endpoint: (OnionRequestResponseInfoType, Codable?)]> { public static func poll(_ server: String, using dependencies: Dependencies = Dependencies()) -> Promise<[Endpoint: (OnionRequestResponseInfoType, Codable?)]> {
// Store a local copy of the cached state for this server // Store a local copy of the cached state for this server
let hadPerformedInitialPoll: Bool = (hasPerformedInitialPoll[server] == true) let hadPerformedInitialPoll: Bool = (hasPerformedInitialPoll[server] == true)
let originalTimeSinceLastPoll: TimeInterval = (timeSinceLastPoll[server] ?? min(lastPollTime, timeSinceLastOpen)) let originalTimeSinceLastPoll: TimeInterval = (timeSinceLastPoll[server] ?? min(lastPollTime.wrappedValue, timeSinceLastOpen.wrappedValue))
let maybeLastInboxMessageId: Int64? = dependencies.storage.getOpenGroupInboxLatestMessageId(for: server) let maybeLastInboxMessageId: Int64? = dependencies.storage.getOpenGroupInboxLatestMessageId(for: server)
let lastInboxMessageId: Int64 = (maybeLastInboxMessageId ?? 0) let lastInboxMessageId: Int64 = (maybeLastInboxMessageId ?? 0)
// Update the cached state for this server // Update the cached state for this server
hasPerformedInitialPoll[server] = true hasPerformedInitialPoll.wrappedValue[server] = true
lastPollTime = min(lastPollTime, timeSinceLastOpen) lastPollTime.wrappedValue = min(lastPollTime.wrappedValue, timeSinceLastOpen.wrappedValue)
UserDefaults.standard[.lastOpen] = Date() UserDefaults.standard[.lastOpen] = Date()
// Generate the requests // Generate the requests

View file

@ -45,7 +45,6 @@ public final class OpenGroupManager: NSObject {
// Clear any existing data if needed // Clear any existing data if needed
storage.removeOpenGroupSequenceNumber(for: roomToken, on: server, using: transaction) storage.removeOpenGroupSequenceNumber(for: roomToken, on: server, using: transaction)
storage.removeAuthToken(for: roomToken, on: server, using: transaction)
// Store the public key // Store the public key
storage.setOpenGroupPublicKey(for: server, to: publicKey, using: transaction) storage.setOpenGroupPublicKey(for: server, to: publicKey, using: transaction)

View file

@ -843,7 +843,7 @@ extension MessageReceiver {
public static func handleMessageRequestResponse(_ message: MessageRequestResponse, using transaction: Any) { public static func handleMessageRequestResponse(_ message: MessageRequestResponse, using transaction: Any) {
let userPublicKey = getUserHexEncodedPublicKey() let userPublicKey = getUserHexEncodedPublicKey()
var blindedContactIds: [String] = [] var hadBlindedContact: Bool = false
var blindedThreadIds: [String] = [] var blindedThreadIds: [String] = []
// Ignore messages which were sent from the current user // Ignore messages which were sent from the current user
@ -882,11 +882,16 @@ extension MessageReceiver {
let mapping: BlindedIdMapping = BlindedIdMapping(blindedId: blindedId, sessionId: senderId, serverPublicKey: serverPublicKey) let mapping: BlindedIdMapping = BlindedIdMapping(blindedId: blindedId, sessionId: senderId, serverPublicKey: serverPublicKey)
Storage.shared.cacheBlindedIdMapping(mapping, using: transaction) Storage.shared.cacheBlindedIdMapping(mapping, using: transaction)
// Add the `blindedId` to an array so we can remove them at the end of processing // Flag that we had a blinded contact and add the `blindedThreadId` to an array so we can remove
blindedContactIds.append(blindedId) // them at the end of processing
hadBlindedContact = true
blindedThreadIds.append(blindedThreadId) blindedThreadIds.append(blindedThreadId)
// Loop through all of the interactions and add them to a list to be moved to the new thread // Loop through all of the interactions and add them to a list to be moved to the new thread
// Note: Pending `MessageSendJobs` _shouldn't_ be an issue as even if they are sent after the
// un-blinding of a thread, the logic when handling the sent messages should automatically
// assign them to the correct thread
// TODO: Validate the above note once `/outbox` has been implemented
view.enumerateRows(inGroup: blindedThreadId) { _, _, object, _, _, _ in view.enumerateRows(inGroup: blindedThreadId) { _, _, object, _, _, _ in
guard let interaction: TSInteraction = object as? TSInteraction else { guard let interaction: TSInteraction = object as? TSInteraction else {
return return
@ -896,9 +901,6 @@ extension MessageReceiver {
} }
threadsToDelete.append(blindedThread) threadsToDelete.append(blindedThread)
// TODO: Pending jobs???
// Storage.shared.getAllPendingJobs(of: <#T##Job.Type#>)
} }
// Sort the interactions by their `sortId` (which looks to be a global sort id for all interactions) just in case // Sort the interactions by their `sortId` (which looks to be a global sort id for all interactions) just in case
@ -916,7 +918,6 @@ extension MessageReceiver {
// Delete the old threads // Delete the old threads
for thread in threadsToDelete { for thread in threadsToDelete {
// TODO: This isn't updating the HomeVC... Race condition??? (Seems to not happen when stepping through with breakpoints)
thread.removeAllThreadInteractions(with: transaction) thread.removeAllThreadInteractions(with: transaction)
thread.remove(with: transaction) thread.remove(with: transaction)
} }
@ -926,19 +927,13 @@ extension MessageReceiver {
updateContactApprovalStatusIfNeeded( updateContactApprovalStatusIfNeeded(
senderSessionId: senderId, senderSessionId: senderId,
threadId: nil, threadId: nil,
forceConfigSync: blindedContactIds.isEmpty, // Sync here if there are no blinded contacts forceConfigSync: !hadBlindedContact, // Sync here if there were no blinded contacts
using: transaction using: transaction
) )
// If there were blinded contacts then we should remove them // If there were blinded contacts then we need to assume that the 'sender' is a newly create contact and hence
if !blindedContactIds.isEmpty { // need to update it's `isApproved` state
// Delete all of the processed blinded contacts (shouldn't need them anymore and don't want them taking up if hadBlindedContact {
// space in the config message)
for blindedId in blindedContactIds {
// TODO: OWSBlockingManager...???
}
// We should assume the 'sender' is a newly created contact and hence need to update it's `isApproved` state
updateContactApprovalStatusIfNeeded( updateContactApprovalStatusIfNeeded(
senderSessionId: userPublicKey, senderSessionId: userPublicKey,
threadId: unblindedThreadId, threadId: unblindedThreadId,

View file

@ -38,12 +38,6 @@ public protocol SessionMessagingKitStorageProtocol {
func resumeMessageSendJobIfNeeded(_ messageSendJobID: String) func resumeMessageSendJobIfNeeded(_ messageSendJobID: String)
func isJobCanceled(_ job: Job) -> Bool func isJobCanceled(_ job: Job) -> Bool
// MARK: - Authorization
func getAuthToken(for room: String, on server: String) -> String?
func setAuthToken(for room: String, on server: String, to newValue: String, using transaction: Any)
func removeAuthToken(for room: String, on server: String, using transaction: Any)
// MARK: - Open Groups // MARK: - Open Groups
func getAllOpenGroups() -> [String: OpenGroup] func getAllOpenGroups() -> [String: OpenGroup]

View file

@ -2,26 +2,95 @@
import Foundation import Foundation
/// The `Atomic<T>` wrapper is a generic wrapper providing a thread-safe way to get and set a value /// See https://www.donnywals.com/why-your-atomic-property-wrapper-doesnt-work-for-collection-types/
/// for more information about the below types
protocol UnsupportedType {}
extension Array: UnsupportedType {}
extension Set: UnsupportedType {}
extension Dictionary: UnsupportedType {}
// MARK: - Atomic<Value>
/// The `Atomic<Value>` wrapper is a generic wrapper providing a thread-safe way to get and set a value
@propertyWrapper @propertyWrapper
struct Atomic<Value> { struct Atomic<Value> {
private let lock = DispatchSemaphore(value: 1) private let queue: DispatchQueue = DispatchQueue(label: "io.oxen.\(UUID().uuidString)", qos: .utility, attributes: .concurrent, autoreleaseFrequency: .inherit, target: .global())
private var value: Value private var value: Value
init(_ initialValue: Value) { init(_ initialValue: Value) {
if initialValue is UnsupportedType { preconditionFailure("Use the appropriate Aromic... type for collections") }
self.value = initialValue self.value = initialValue
} }
var wrappedValue: Value { var wrappedValue: Value {
get { get { return queue.sync { return value } }
lock.wait() set { return queue.sync { value = newValue } }
defer { lock.signal() } }
return value }
}
set { extension Atomic where Value: CustomDebugStringConvertible {
lock.wait() var debugDescription: String {
value = newValue return value.debugDescription
lock.signal() }
} }
// MARK: - AtomicArray<Value>
/// The `AtomicArray<Value>` wrapper is a generic wrapper providing a thread-safe way to get and set an array or one of it's values
///
/// Note: This is a class rather than a struct as you need to modify a reference rather than a copy for the concurrency to work
@propertyWrapper
class AtomicArray<Value>: CustomDebugStringConvertible {
private let queue: DispatchQueue = DispatchQueue(label: "io.oxen.\(UUID().uuidString)", qos: .utility, attributes: .concurrent, autoreleaseFrequency: .inherit, target: .global())
private var value: [Value]
init(_ initialValue: [Value] = []) {
self.value = initialValue
}
var wrappedValue: [Value] {
get { return queue.sync { return value } }
set { return queue.sync { value = newValue } }
}
subscript(index: Int) -> Value {
get { queue.sync { value[index] }}
set { queue.async(flags: .barrier) { self.value[index] = newValue } }
}
public var debugDescription: String {
return value.debugDescription
}
}
// MARK: - AtomicDict<Key, Value>
/// The `AtomicDict<Key, Value>` wrapper is a generic wrapper providing a thread-safe way to get and set a dictionaries or one of it's values
///
/// Note: This is a class rather than a struct as you need to modify a reference rather than a copy for the concurrency to work
@propertyWrapper
class AtomicDict<Key: Hashable, Value>: CustomDebugStringConvertible {
private let queue: DispatchQueue = DispatchQueue(label: "io.oxen.\(UUID().uuidString)", qos: .utility, attributes: .concurrent, autoreleaseFrequency: .inherit, target: .global())
private var value: [Key: Value]
init(_ initialValue: [Key: Value] = [:]) {
self.value = initialValue
}
var wrappedValue: [Key: Value] {
get { return queue.sync { return value } }
set { return queue.sync { value = newValue } }
}
subscript(key: Key) -> Value? {
get { queue.sync { value[key] }}
set { queue.async(flags: .barrier) { self.value[key] = newValue } }
}
var debugDescription: String {
return value.debugDescription
} }
} }