Fixed a couple of QA issues

Forced the user config feature to be on (for testing)
Fixed a bug where triggering the 'Delete for everyone' functionality would incorrectly try to delete from the recipient swarm (not possible)
Fixed a bug where the 'profileNamePublisher' could only be set once resulting in potential issues if you try to restore different accounts within the same session
Re-added the limit to the number of reactions to display before collapsing to make it consistent with the designs and other platforms
Updated the SnodeAPI to ensure that when it retries it will actually select a new snode
This commit is contained in:
Morgan Pretty 2023-05-23 09:42:10 +10:00
parent 19eddd79a1
commit 3f362a71f3
17 changed files with 301 additions and 207 deletions

View File

@ -1921,6 +1921,10 @@ extension ConversationVC:
}
case .contact, .legacyGroup, .group:
let targetPublicKey: String = (cellViewModel.threadVariant == .contact ?
userPublicKey :
cellViewModel.threadId
)
let serverHash: String? = Storage.shared.read { db -> String? in
try Interaction
.select(.serverHash)
@ -1996,16 +2000,7 @@ extension ConversationVC:
accessibilityIdentifier: "Delete for everyone",
style: .destructive
) { [weak self] _ in
deleteRemotely(
from: self,
request: SnodeAPI
.deleteMessages(
publicKey: cellViewModel.threadId,
serverHashes: [serverHash]
)
.map { _ in () }
.eraseToAnyPublisher()
) { [weak self] in
let completeServerDeletion = { [weak self] in
Storage.shared.writeAsync { db in
try MessageSender
.send(
@ -2019,6 +2014,22 @@ extension ConversationVC:
self?.showInputAccessoryView()
}
// We can only delete messages on the server for `contact` and `group` conversations
guard cellViewModel.threadVariant == .contact || cellViewModel.threadVariant == .group else {
return completeServerDeletion()
}
deleteRemotely(
from: self,
request: SnodeAPI
.deleteMessages(
publicKey: targetPublicKey,
serverHashes: [serverHash]
)
.map { _ in () }
.eraseToAnyPublisher()
) { completeServerDeletion() }
})
actionSheet.addAction(UIAlertAction.init(title: "TXT_CANCEL_TITLE".localized(), style: .cancel) { [weak self] _ in

View File

@ -9,6 +9,13 @@ final class ReactionContainerView: UIView {
private static let arrowSize: CGSize = CGSize(width: 15, height: 13)
private static let arrowSpacing: CGFloat = Values.verySmallSpacing
// We have explicit limits on the number of emoji which should be displayed before they
// automatically get collapsed, these values are consistent across platforms so are set
// here (even though the logic will automatically calculate and limit to a single line
// of reactions dynamically for the size of the view)
private static let numCollapsedEmoji: Int = 4
private static let maxEmojiBeforeCollapse: Int = 6
private var maxWidth: CGFloat = 0
private var collapsedCount: Int = 0
private var showingAllReactions: Bool = false
@ -173,7 +180,10 @@ final class ReactionContainerView: UIView {
numReactions += 1
}
return numReactions
return (numReactions > ReactionContainerView.maxEmojiBeforeCollapse ?
ReactionContainerView.numCollapsedEmoji :
numReactions
)
}()
self.showNumbers = showNumbers
self.reactionViews = []

View File

@ -71,7 +71,8 @@ public struct SessionApp {
// This _should_ be wiped out below.
Logger.error("")
DDLog.flushLog()
SessionUtil.clearMemoryState()
Storage.resetAllStorage()
ProfileManager.resetProfileStorage()
Attachment.resetAttachmentStorage()

View File

@ -86,6 +86,12 @@ final class LinkDeviceVC: BaseVC, UIPageViewControllerDataSource, UIPageViewCont
scanQRCodePlaceholderVC.constrainHeight(to: height)
}
override func viewWillAppear(_ animated: Bool) {
super.viewWillAppear(animated)
Onboarding.Flow.register.unregister()
}
override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
tabBarTopConstraint.constant = navigationController!.navigationBar.height()

View File

@ -8,111 +8,152 @@ import SessionUtilitiesKit
import SessionMessagingKit
enum Onboarding {
private static let profileNameRetrievalPublisher: Atomic<AnyPublisher<String?, Error>> = {
private static let profileNameRetrievalIdentifier: Atomic<UUID?> = Atomic(nil)
private static let profileNameRetrievalPublisher: Atomic<AnyPublisher<String?, Error>?> = Atomic(nil)
public static var profileNamePublisher: AnyPublisher<String?, Error> {
guard let existingPublisher: AnyPublisher<String?, Error> = profileNameRetrievalPublisher.wrappedValue else {
return profileNameRetrievalPublisher.mutate { value in
let requestId: UUID = UUID()
let result: AnyPublisher<String?, Error> = createProfileNameRetrievalPublisher(requestId)
value = result
profileNameRetrievalIdentifier.mutate { $0 = requestId }
return result
}
}
return existingPublisher
}
private static func createProfileNameRetrievalPublisher(_ requestId: UUID) -> AnyPublisher<String?, Error> {
// FIXME: Remove this once `useSharedUtilForUserConfig` is permanent
guard SessionUtil.userConfigsEnabled else {
return Atomic(
Just(nil)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
)
return Just(nil)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
let userPublicKey: String = getUserHexEncodedPublicKey()
return Atomic(
SnodeAPI.getSwarm(for: userPublicKey)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.tryFlatMap { swarm -> AnyPublisher<Void, Error> in
guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return CurrentUserPoller
.poll(
namespaces: [.configUserProfile],
from: snode,
for: userPublicKey,
on: DispatchQueue.global(qos: .userInitiated),
// Note: These values mean the received messages will be
// processed immediately rather than async as part of a Job
calledFromBackgroundPoller: true,
isBackgroundPollValid: { true }
)
.tryFlatMap { receivedMessageTypes -> AnyPublisher<Void, Error> in
// FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough
guard receivedMessageTypes.isEmpty else {
return Just(())
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
SNLog("Onboarding failed to retrieve user config, checking for legacy config")
return CurrentUserPoller
.poll(
namespaces: [.default],
from: snode,
for: userPublicKey,
on: DispatchQueue.global(qos: .userInitiated),
// Note: These values mean the received messages will be
// processed immediately rather than async as part of a Job
calledFromBackgroundPoller: true,
isBackgroundPollValid: { true }
)
.tryMap { receivedMessageTypes -> Void in
guard
let message: ConfigurationMessage = receivedMessageTypes
.last(where: { $0 is ConfigurationMessage })
.asType(ConfigurationMessage.self),
let displayName: String = message.displayName
else { return () }
// Handle user profile changes
Storage.shared.write { db in
try ProfileManager.updateProfileIfNeeded(
db,
publicKey: userPublicKey,
name: displayName,
avatarUpdate: {
guard
let profilePictureUrl: String = message.profilePictureUrl,
let profileKey: Data = message.profileKey
else { return .none }
return .updateTo(
url: profilePictureUrl,
key: profileKey,
fileName: nil
)
}(),
sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000),
calledFromConfigHandling: false
)
}
return ()
}
return SnodeAPI.getSwarm(for: userPublicKey)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.tryFlatMapWithRandomSnode { snode -> AnyPublisher<Void, Error> in
CurrentUserPoller
.poll(
namespaces: [.configUserProfile],
from: snode,
for: userPublicKey,
on: DispatchQueue.global(qos: .userInitiated),
// Note: These values mean the received messages will be
// processed immediately rather than async as part of a Job
calledFromBackgroundPoller: true,
isBackgroundPollValid: { true }
)
.tryFlatMap { receivedMessageTypes -> AnyPublisher<Void, Error> in
// FIXME: Remove this entire 'tryFlatMap' once the updated user config has been released for long enough
guard
receivedMessageTypes.isEmpty,
requestId == profileNameRetrievalIdentifier.wrappedValue
else {
return Just(())
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
}
.flatMap { _ -> AnyPublisher<String?, Error> in
Storage.shared.readPublisher { db in
try Profile
.filter(id: userPublicKey)
.select(.name)
.asRequest(of: String.self)
.fetchOne(db)
SNLog("Onboarding failed to retrieve user config, checking for legacy config")
return CurrentUserPoller
.poll(
namespaces: [.default],
from: snode,
for: userPublicKey,
on: DispatchQueue.global(qos: .userInitiated),
// Note: These values mean the received messages will be
// processed immediately rather than async as part of a Job
calledFromBackgroundPoller: true,
isBackgroundPollValid: { true }
)
.tryMap { receivedMessageTypes -> Void in
guard
let message: ConfigurationMessage = receivedMessageTypes
.last(where: { $0 is ConfigurationMessage })
.asType(ConfigurationMessage.self),
let displayName: String = message.displayName,
requestId == profileNameRetrievalIdentifier.wrappedValue
else { return () }
// Handle user profile changes
Storage.shared.write { db in
try ProfileManager.updateProfileIfNeeded(
db,
publicKey: userPublicKey,
name: displayName,
avatarUpdate: {
guard
let profilePictureUrl: String = message.profilePictureUrl,
let profileKey: Data = message.profileKey
else { return .none }
return .updateTo(
url: profilePictureUrl,
key: profileKey,
fileName: nil
)
}(),
sentTimestamp: TimeInterval((message.sentTimestamp ?? 0) / 1000),
calledFromConfigHandling: false
)
}
return ()
}
.eraseToAnyPublisher()
}
}
.map { _ -> String? in
guard requestId == profileNameRetrievalIdentifier.wrappedValue else {
return nil
}
.shareReplay(1)
.eraseToAnyPublisher()
)
}()
public static var profileNamePublisher: AnyPublisher<String?, Error> {
profileNameRetrievalPublisher.wrappedValue
return Storage.shared.read { db in
try Profile
.filter(id: userPublicKey)
.select(.name)
.asRequest(of: String.self)
.fetchOne(db)
}
}
.shareReplay(1)
.eraseToAnyPublisher()
}
enum Flow {
case register, recover, link
/// If the user returns to an earlier screen during Onboarding we might need to clear out a partially created
/// account (eg. returning from the PN setting screen to the seed entry screen when linking a device)
func unregister() {
// Clear the in-memory state from SessionUtil
SessionUtil.clearMemoryState()
// Clear any data which gets set during Onboarding
Storage.shared.write { db in
db[.hasViewedSeed] = false
try SessionThread.deleteAll(db)
try Profile.deleteAll(db)
try Contact.deleteAll(db)
try Identity.deleteAll(db)
try ConfigDump.deleteAll(db)
try SnodeReceivedMessageInfo.deleteAll(db)
}
// Clear the profile name retrieve publisher
profileNameRetrievalIdentifier.mutate { $0 = nil }
profileNameRetrievalPublisher.mutate { $0 = nil }
UserDefaults.standard[.hasSyncedInitialConfiguration] = false
}
func preregister(with seed: Data, ed25519KeyPair: KeyPair, x25519KeyPair: KeyPair) {
let x25519PublicKey = x25519KeyPair.hexEncodedPublicKey

View File

@ -174,7 +174,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate {
}
// If we don't have one then show a loading indicator and try to retrieve the existing name
ModalActivityIndicatorViewController.present(fromViewController: self) { viewController in
ModalActivityIndicatorViewController.present(fromViewController: self) { [weak self, flow = self.flow] viewController in
Onboarding.profileNamePublisher
.timeout(.seconds(15), scheduler: DispatchQueue.main, customError: { HTTPError.timeout })
.catch { _ -> AnyPublisher<String?, Error> in
@ -185,7 +185,7 @@ final class PNModeVC: BaseVC, OptionViewDelegate {
}
.receive(on: DispatchQueue.main)
.sinkUntilComplete(
receiveValue: { [weak self, flow = self.flow] value in
receiveValue: { value in
// Hide the loading indicator
viewController.dismiss(animated: true)

View File

@ -151,6 +151,12 @@ final class RegisterVC : BaseVC {
updateSeed()
}
override func viewWillAppear(_ animated: Bool) {
super.viewWillAppear(animated)
Onboarding.Flow.register.unregister()
}
// MARK: General
@objc private func enableCopyButton() {
copyPublicKeyButton.isUserInteractionEnabled = true

View File

@ -128,8 +128,15 @@ final class RestoreVC: BaseVC {
notificationCenter.addObserver(self, selector: #selector(handleKeyboardWillHideNotification(_:)), name: UIResponder.keyboardWillHideNotification, object: nil)
}
override func viewWillAppear(_ animated: Bool) {
super.viewWillAppear(animated)
Onboarding.Flow.register.unregister()
}
override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
// On small screens we hide the legal label when the keyboard is up, but it's important that the user sees it so
// in those instances we don't make the keyboard come up automatically
if !isIPhone5OrSmaller {

View File

@ -67,10 +67,8 @@ public final class BackgroundPoller {
return SnodeAPI.getSwarm(for: userPublicKey)
.subscribeOnMain(immediately: true)
.receiveOnMain(immediately: true)
.tryFlatMap { swarm -> AnyPublisher<[Message], Error> in
guard let snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return CurrentUserPoller.poll(
.tryFlatMapWithRandomSnode { snode -> AnyPublisher<[Message], Error> in
CurrentUserPoller.poll(
namespaces: CurrentUserPoller.namespaces,
from: snode,
for: userPublicKey,

View File

@ -846,10 +846,9 @@ public extension Interaction {
.asRequest(of: Attachment.DescriptionInfo.self)
.fetchOne(db),
attachmentCount: try? attachments.fetchCount(db),
isOpenGroupInvitation: (try? linkPreview
isOpenGroupInvitation: linkPreview
.filter(LinkPreview.Columns.variant == LinkPreview.Variant.openGroupInvitation)
.isNotEmpty(db))
.defaulting(to: false)
.isNotEmpty(db)
)
case .infoMediaSavedNotification, .infoScreenshotNotification, .infoCall:

View File

@ -31,7 +31,16 @@ public enum MessageSendJob: JobExecutor {
// so extract them from any associated attachments
var messageFileIds: [String] = []
if details.message is VisibleMessage {
/// Ensure any associated attachments have already been uploaded before sending the message
///
/// **Note:** Reactions reference their original message so we need to ignore this logic for reaction messages to ensure we don't
/// incorrectly re-upload incoming attachments that the user reacted to, we also want to exclude "sync" messages since they should
/// already have attachments in a valid state
if
details.message is VisibleMessage,
(details.message as? VisibleMessage)?.reaction == nil &&
details.isSyncMessage == false
{
guard
let jobId: Int64 = job.id,
let interactionId: Int64 = job.interactionId

View File

@ -10,6 +10,7 @@ import SessionUtilitiesKit
public extension Features {
static func useSharedUtilForUserConfig(_ db: Database? = nil) -> Bool {
return true
// TODO: Need to set this timestamp to the correct date (currently start of 2030)
// guard Date().timeIntervalSince1970 < 1893456000 else { return true }
guard !SessionUtil.hasCheckedMigrationsCompleted.wrappedValue else {
@ -141,6 +142,15 @@ public enum SessionUtil {
// MARK: - Loading
public static func clearMemoryState() {
// Ensure we have a loaded state before we continue
guard !SessionUtil.configStore.wrappedValue.isEmpty else { return }
SessionUtil.configStore.mutate { confStore in
confStore.removeAll()
}
}
public static func loadState(
_ db: Database? = nil,
userPublicKey: String,

View File

@ -639,10 +639,8 @@ public final class SnodeAPI {
}
return getSwarm(for: publicKey)
.tryFlatMap { swarm -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return try sendMessage(to: snode)
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<(ResponseInfoType, SendMessagesResponse), Error> in
try sendMessage(to: snode)
.tryMap { info, response -> (ResponseInfoType, SendMessagesResponse) in
try response.validateResultMap(
sodium: sodium.wrappedValue,
@ -651,11 +649,8 @@ public final class SnodeAPI {
return (info, response)
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
public static func sendConfigMessages(
@ -732,10 +727,8 @@ public final class SnodeAPI {
let responseTypes = requests.map { $0.responseType }
return getSwarm(for: publicKey)
.tryFlatMap { swarm -> AnyPublisher<HTTP.BatchResponse, Error> in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return SnodeAPI
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<HTTP.BatchResponse, Error> in
SnodeAPI
.send(
request: SnodeRequest(
endpoint: .sequence,
@ -749,8 +742,6 @@ public final class SnodeAPI {
.decoded(as: responseTypes, requireAllResults: false, using: dependencies)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
// MARK: - Edit
@ -768,10 +759,8 @@ public final class SnodeAPI {
return getSwarm(for: publicKey)
.subscribe(on: Threading.workQueue)
.tryFlatMap { swarm -> AnyPublisher<[String: [(hash: String, expiry: UInt64)]], Error> in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return SnodeAPI
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: [(hash: String, expiry: UInt64)]], Error> in
SnodeAPI
.send(
request: SnodeRequest(
endpoint: .expire,
@ -796,11 +785,8 @@ public final class SnodeAPI {
validationData: serverHashes
)
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
public static func revokeSubkey(
@ -815,10 +801,8 @@ public final class SnodeAPI {
return getSwarm(for: publicKey)
.subscribe(on: Threading.workQueue)
.tryFlatMap { swarm -> AnyPublisher<Void, Error> in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return SnodeAPI
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<Void, Error> in
SnodeAPI
.send(
request: SnodeRequest(
endpoint: .revokeSubkey,
@ -843,11 +827,8 @@ public final class SnodeAPI {
return ()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
// MARK: Delete
@ -866,61 +847,46 @@ public final class SnodeAPI {
return getSwarm(for: publicKey)
.subscribe(on: Threading.workQueue)
.flatMap { swarm -> AnyPublisher<[String: Bool], Error> in
Just(())
.setFailureType(to: Error.self)
.tryMap { _ -> Snode in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return snode
}
.flatMap { snode -> AnyPublisher<[String: Bool], Error> in
SnodeAPI
.send(
request: SnodeRequest(
endpoint: .deleteMessages,
body: DeleteMessagesRequest(
messageHashes: serverHashes,
requireSuccessfulDeletion: false,
pubkey: userX25519PublicKey,
ed25519PublicKey: userED25519KeyPair.publicKey,
ed25519SecretKey: userED25519KeyPair.secretKey
)
),
to: snode,
associatedWith: publicKey,
using: dependencies
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in
SnodeAPI
.send(
request: SnodeRequest(
endpoint: .deleteMessages,
body: DeleteMessagesRequest(
messageHashes: serverHashes,
requireSuccessfulDeletion: false,
pubkey: userX25519PublicKey,
ed25519PublicKey: userED25519KeyPair.publicKey,
ed25519SecretKey: userED25519KeyPair.secretKey
)
.subscribe(on: Threading.workQueue)
.eraseToAnyPublisher()
.decoded(as: DeleteMessagesResponse.self, using: dependencies)
.tryMap { _, response -> [String: Bool] in
let validResultMap: [String: Bool] = try response.validResultMap(
sodium: sodium.wrappedValue,
userX25519PublicKey: userX25519PublicKey,
validationData: serverHashes
)
// If `validResultMap` didn't throw then at least one service node
// deleted successfully so we should mark the hash as invalid so we
// don't try to fetch updates using that hash going forward (if we
// do we would end up re-fetching all old messages)
Storage.shared.writeAsync { db in
try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash(
db,
potentiallyInvalidHashes: serverHashes
)
}
return validResultMap
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
),
to: snode,
associatedWith: publicKey,
using: dependencies
)
.decoded(as: DeleteMessagesResponse.self, using: dependencies)
.tryMap { _, response -> [String: Bool] in
let validResultMap: [String: Bool] = try response.validResultMap(
sodium: sodium.wrappedValue,
userX25519PublicKey: userX25519PublicKey,
validationData: serverHashes
)
// If `validResultMap` didn't throw then at least one service node
// deleted successfully so we should mark the hash as invalid so we
// don't try to fetch updates using that hash going forward (if we
// do we would end up re-fetching all old messages)
Storage.shared.writeAsync { db in
try? SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash(
db,
potentiallyInvalidHashes: serverHashes
)
}
return validResultMap
}
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
/// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation.
@ -937,10 +903,8 @@ public final class SnodeAPI {
return getSwarm(for: userX25519PublicKey)
.subscribe(on: Threading.workQueue)
.tryFlatMap { swarm -> AnyPublisher<[String: Bool], Error> in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return getNetworkTime(from: snode)
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in
getNetworkTime(from: snode)
.flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in
SnodeAPI
.send(
@ -968,11 +932,8 @@ public final class SnodeAPI {
}
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
/// Clears all the user's data from their swarm. Returns a dictionary of snode public key to deletion confirmation.
@ -990,10 +951,8 @@ public final class SnodeAPI {
return getSwarm(for: userX25519PublicKey)
.subscribe(on: Threading.workQueue)
.tryFlatMap { swarm -> AnyPublisher<[String: Bool], Error> in
guard let snode: Snode = swarm.randomElement() else { throw SnodeAPIError.generic }
return getNetworkTime(from: snode)
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<[String: Bool], Error> in
getNetworkTime(from: snode)
.flatMap { timestampMs -> AnyPublisher<[String: Bool], Error> in
SnodeAPI
.send(
@ -1022,11 +981,8 @@ public final class SnodeAPI {
}
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
.retry(maxRetryCount)
.eraseToAnyPublisher()
}
// MARK: - Internal API
@ -1377,3 +1333,31 @@ public final class SNSnodeAPI: NSObject {
return UInt64(SnodeAPI.currentOffsetTimestampMs())
}
}
// MARK: - Convenience
public extension Publisher where Output == Set<Snode> {
func tryFlatMapWithRandomSnode<T, P>(
maxPublishers: Subscribers.Demand = .unlimited,
retry retries: Int = 0,
_ transform: @escaping (Snode) throws -> P
) -> AnyPublisher<T, Error> where T == P.Output, P: Publisher, P.Failure == Error {
return self
.mapError { $0 }
.flatMap(maxPublishers: maxPublishers) { swarm -> AnyPublisher<T, Error> in
var remainingSnodes: Set<Snode> = swarm
return Just(())
.setFailureType(to: Error.self)
.tryFlatMap(maxPublishers: maxPublishers) { _ -> AnyPublisher<T, Error> in
let snode: Snode = try remainingSnodes.popRandomElement() ?? { throw SnodeAPIError.generic }()
return try transform(snode)
.eraseToAnyPublisher()
}
.retry(retries)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}

View File

@ -137,7 +137,7 @@ public extension AnyPublisher {
// MARK: - Data Decoding
public extension AnyPublisher where Output == Data, Failure == Error {
public extension Publisher where Output == Data, Failure == Error {
func decoded<R: Decodable>(
as type: R.Type,
using dependencies: Dependencies = Dependencies()
@ -148,7 +148,7 @@ public extension AnyPublisher where Output == Data, Failure == Error {
}
}
public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error {
public extension Publisher where Output == (ResponseInfoType, Data?), Failure == Error {
func decoded<R: Decodable>(
as type: R.Type,
using dependencies: Dependencies = Dependencies()

View File

@ -29,4 +29,11 @@ public extension Set {
return updatedSet
}
mutating func popRandomElement() -> Element? {
guard let value: Element = randomElement() else { return nil }
self.remove(value)
return value
}
}

View File

@ -537,6 +537,11 @@ private final class JobQueue {
}
queue.mutate { $0.append(job) }
// If this is a concurrent queue then we should immediately start the next job
guard executionType == .concurrent else { return }
runNextJob()
}
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start

View File

@ -78,7 +78,7 @@ public extension Decodable {
}
}
public extension AnyPublisher where Output == (ResponseInfoType, Data?), Failure == Error {
public extension Publisher where Output == (ResponseInfoType, Data?), Failure == Error {
func decoded(
as types: HTTP.BatchResponseTypes,
requireAllResults: Bool = true,