mirror of
https://github.com/oxen-io/session-ios.git
synced 2023-12-13 21:30:14 +01:00
f1de95ab06
FREEBIE
125 lines
3.4 KiB
Objective-C
125 lines
3.4 KiB
Objective-C
#import "ObservableValue.h"
|
|
#import "Util.h"
|
|
#import "Environment.h"
|
|
|
|
@implementation ObservableValue
|
|
|
|
@synthesize currentValue;
|
|
|
|
-(ObservableValue*) initWithValue:(id)value {
|
|
callbacks = [NSMutableSet set];
|
|
queuedActionsToRun = [Queue new];
|
|
currentValue = value;
|
|
return self;
|
|
}
|
|
|
|
-(void) watchLatestValueOnArbitraryThread:(LatestValueCallback)callback
|
|
untilCancelled:(TOCCancelToken*)untilCancelledToken {
|
|
|
|
require(callback != nil);
|
|
if (untilCancelledToken.isAlreadyCancelled) return;
|
|
|
|
void(^callbackCopy)(id value) = [callback copy];
|
|
[self queueRun:^{
|
|
callbackCopy(self.currentValue);
|
|
[callbacks addObject:callbackCopy];
|
|
}];
|
|
[untilCancelledToken whenCancelledDo:^{
|
|
[self queueRun:^{
|
|
[callbacks removeObject:callbackCopy];
|
|
}];
|
|
}];
|
|
}
|
|
-(void) watchLatestValue:(LatestValueCallback)callback
|
|
onThread:(NSThread*)thread
|
|
untilCancelled:(TOCCancelToken*)untilCancelledToken {
|
|
|
|
require(callback != nil);
|
|
require(thread != nil);
|
|
|
|
void(^callbackCopy)(id value) = [callback copy];
|
|
void(^threadedCallback)(id value) = ^(id value) {
|
|
[Operation asyncRun:^{callbackCopy(value);} onThread:thread];
|
|
};
|
|
|
|
[self watchLatestValueOnArbitraryThread:threadedCallback
|
|
untilCancelled:untilCancelledToken];
|
|
}
|
|
|
|
/// used for avoiding re-entrancy issues (e.g. a callback registering another callback during enumeration)
|
|
-(void) queueRun:(void(^)())action {
|
|
@synchronized(self) {
|
|
if (isRunningActions) {
|
|
[queuedActionsToRun enqueue:[action copy]];
|
|
return;
|
|
}
|
|
isRunningActions = true;
|
|
}
|
|
|
|
while (true) {
|
|
@try {
|
|
action();
|
|
} @catch (id ex) {
|
|
[[Environment.logging getConditionLoggerForSender:self]
|
|
logError:@"A queued action failed and may have stalled an ObservableValue."];
|
|
@synchronized(self) {
|
|
isRunningActions = false;
|
|
}
|
|
[ex raise];
|
|
}
|
|
|
|
@synchronized(self) {
|
|
action = [queuedActionsToRun tryDequeue];
|
|
if (action == nil) {
|
|
isRunningActions = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
-(void)updateValue:(id)value {
|
|
[self queueRun:^{
|
|
if (value == currentValue) return;
|
|
requireState(!sealed);
|
|
|
|
currentValue = value;
|
|
for (void(^callback)(id value) in callbacks) {
|
|
callback(value);
|
|
}
|
|
}];
|
|
}
|
|
-(void)adjustValue:(id(^)(id))adjustment {
|
|
require(adjustment != nil);
|
|
[self queueRun:^{
|
|
id oldValue = currentValue;
|
|
id newValue = adjustment(oldValue);
|
|
if (oldValue == newValue) return;
|
|
requireState(!sealed);
|
|
|
|
currentValue = newValue;
|
|
for (void(^callback)(id value) in callbacks) {
|
|
callback(currentValue);
|
|
}
|
|
}];
|
|
}
|
|
|
|
@end
|
|
|
|
@implementation ObservableValueController
|
|
|
|
+(ObservableValueController *)observableValueControllerWithInitialValue:(id)value {
|
|
return [[ObservableValueController alloc] initWithValue:value];
|
|
}
|
|
|
|
-(void)updateValue:(id)value {
|
|
[super updateValue:value];
|
|
}
|
|
-(void)adjustValue:(id(^)(id))adjustment {
|
|
[super adjustValue:adjustment];
|
|
}
|
|
-(void) sealValue {
|
|
[self queueRun:^{sealed = true; callbacks = nil;}];
|
|
}
|
|
|
|
@end
|