session-ios/SessionUtilitiesKit/Combine/ReplaySubject.swift

124 lines
4.6 KiB
Swift

// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Combine
/// A subject that stores the last `bufferSize` emissions and emits them for every new subscriber
///
/// Note: This implementation was found here: https://github.com/sgl0v/OnSwiftWings
public final class ReplaySubject<Output, Failure: Error>: Subject {
private var buffer: [Output] = [Output]()
private let bufferSize: Int
private let lock: NSRecursiveLock = NSRecursiveLock()
private var subscriptions: Atomic<[ReplaySubjectSubscription<Output, Failure>]> = Atomic([])
private var completion: Subscribers.Completion<Failure>?
// MARK: - Initialization
init(_ bufferSize: Int = 0) {
self.bufferSize = bufferSize
}
// MARK: - Subject Methods
/// Sends a value to the subscriber
public func send(_ value: Output) {
lock.lock(); defer { lock.unlock() }
buffer.append(value)
buffer = buffer.suffix(bufferSize)
subscriptions.wrappedValue.forEach { $0.receive(value) }
}
/// Sends a completion signal to the subscriber
public func send(completion: Subscribers.Completion<Failure>) {
lock.lock(); defer { lock.unlock() }
self.completion = completion
subscriptions.wrappedValue.forEach { $0.receive(completion: completion) }
}
/// Provides this Subject an opportunity to establish demand for any new upstream subscriptions
public func send(subscription: Subscription) {
lock.lock(); defer { lock.unlock() }
subscription.request(.unlimited)
}
/// This function is called to attach the specified `Subscriber` to the`Publisher
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
lock.lock(); defer { lock.unlock() }
/// According to the below comment the `subscriber.receive(subscription: subscription)` code runs asynchronously
/// which aligns with testing (resulting in the `request(_ newDemand: Subscribers.Demand)` function getting called after this
/// function returns
///
/// Later in the thread it's mentioned that as of `iOS 13.3` this behaviour changed to be synchronous but as of writing the minimum
/// deployment version is set to `iOS 13.0` which I assume is why we are seeing the async behaviour which results in `receiveValue`
/// not being called in some cases
///
/// When the project is eventually updated to have a minimum version higher than `iOS 13.3` we should re-test this behaviour to see if
/// we can revert this change
///
/// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631/20
let subscription: ReplaySubjectSubscription = ReplaySubjectSubscription<Output, Failure>(downstream: AnySubscriber(subscriber)) { [weak self, buffer = buffer, completion = completion] subscription in
self?.subscriptions.mutate { $0.append(subscription) }
subscription.replay(buffer, completion: completion)
}
subscriber.receive(subscription: subscription)
}
}
// MARK: -
public final class ReplaySubjectSubscription<Output, Failure: Error>: Subscription {
private let downstream: AnySubscriber<Output, Failure>
private var isCompleted: Bool = false
private var demand: Subscribers.Demand = .none
private var onInitialDemand: ((ReplaySubjectSubscription) -> ())?
// MARK: - Initialization
init(downstream: AnySubscriber<Output, Failure>, onInitialDemand: @escaping (ReplaySubjectSubscription) -> ()) {
self.downstream = downstream
self.onInitialDemand = onInitialDemand
}
// MARK: - Subscription
public func request(_ newDemand: Subscribers.Demand) {
demand += newDemand
onInitialDemand?(self)
onInitialDemand = nil
}
public func cancel() {
isCompleted = true
}
// MARK: - Functions
public func receive(_ value: Output) {
guard !isCompleted, demand > 0 else { return }
demand += downstream.receive(value)
demand -= 1
}
public func receive(completion: Subscribers.Completion<Failure>) {
guard !isCompleted else { return }
isCompleted = true
downstream.receive(completion: completion)
}
public func replay(_ values: [Output], completion: Subscribers.Completion<Failure>?) {
guard !isCompleted else { return }
values.forEach { value in receive(value) }
if let completion = completion {
receive(completion: completion)
}
}
}