moved Messaging to dedicated library.

This commit is contained in:
Alexander Kozachenko 2023-11-21 11:44:58 +03:00
parent 5d4a7ad344
commit 019cf5ce2a
8 changed files with 34 additions and 120 deletions

View File

@ -1,14 +0,0 @@
using ProSol.Messaging;
namespace ProSol.Html.Contracts.Operations;
/// <summary>
/// Provides a conditional subscription.
/// </summary>
/// <remarks>
/// - currently, the filter is the tag name.
/// </remarks>
public interface IFilteredPublisher<T> : IPublisher<T>
{
public IDisposable Subscribe(ISubscriber<T> subscriber, params string[] filters);
}

View File

@ -1,6 +0,0 @@
namespace ProSol.Html.Contracts.Operations;
public interface IPriorityObservable<T> : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer, int priority);
}

View File

@ -1,83 +0,0 @@
using ProSol.Html.Contracts.Data;
using ProSol.Messaging;
namespace ProSol.Html.Messaging;
// TODO: Move to ProSol.Messaging.
internal class Broadcaster
{
readonly HashSet<TagObserver> observers = [];
internal void OnNext(TagsProviderMessage message, string tagName)
{
var candidates = observers.Where(x => x.TagName == tagName || x.TagName == null);
var ejectable = new EjectableMessage();
foreach (var observer in candidates)
{
observer.Observer.OnNext(ejectable, message);
if (ejectable.IsEjected)
{
break;
}
}
}
internal void OnCompleted()
{
foreach (var observer in observers)
{
observer.Observer.OnCompleted();
}
}
internal IDisposable Subscribe(ISubscriber<TagsProviderMessage> observer)
{
var tagObserver = new TagObserver(observer, null);
observers.Add(tagObserver);
return new Unsubscriber<TagsProviderMessage>(observers, [tagObserver]);
}
internal IDisposable Subscribe(ISubscriber<TagsProviderMessage> observer, params string[] tagNames)
{
if (tagNames == null)
{
return Subscribe(observer);
}
var tagObservers = new List<TagObserver>();
foreach (var tagName in tagNames)
{
var tagObserver = new TagObserver(observer, tagName);
observers.Add(tagObserver);
tagObservers.Add(tagObserver);
}
return new Unsubscriber<TagsProviderMessage>(observers, tagObservers);
}
private sealed class Unsubscriber<T>(
ICollection<TagObserver> observers,
List<TagObserver> targets)
: IDisposable
{
private readonly ICollection<TagObserver> observers = observers;
private readonly List<TagObserver> targets = targets;
public void Dispose()
{
foreach (var target in targets)
{
observers.Remove(target);
}
}
}
private class EjectableMessage : IEjectable
{
public bool IsEjected { get; set; } = false;
public void Eject()
{
IsEjected = true;
}
}
}

View File

@ -1,6 +0,0 @@
using ProSol.Html.Contracts.Data;
using ProSol.Messaging;
namespace ProSol.Html.Messaging;
internal record class TagObserver(ISubscriber<TagsProviderMessage> Observer, string? TagName);

View File

@ -0,0 +1,21 @@
using ProSol.Html.Contracts.Data;
using ProSol.Messaging;
using ProSol.Messaging.Subsciptions;
namespace ProSol.Html.Messaging;
public class TagsSubscription(ISubscriber<TagsProviderMessage> subscriber, params string[] tagNames)
: SubscriptionBase<TagsProviderMessage>(subscriber)
{
protected override void OnNext(ISubscriber<TagsProviderMessage> subscriber, TagsProviderMessage message, NextDelegate next)
{
if (tagNames.Any(x => message.CurrentTag.TagInfo.Name == x))
{
subscriber.OnNext(message, next);
}
// HACK: FilteredSubscription should not break a pipeline
// just because the filter doesn't match.
next();
}
}

View File

@ -8,7 +8,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="ProSol.Messaging" Version="2.0.0" />
<PackageReference Include="ProSol.Messaging" Version="3.0.0-rc.0.2" />
</ItemGroup>
</Project>

View File

@ -1,8 +1,8 @@
using ProSol.Html.Contracts.Data;
using ProSol.Html.Contracts.Operations;
using ProSol.Html.Data;
using ProSol.Html.Messaging;
using ProSol.Messaging;
using ProSol.Messaging.Subsciptions;
namespace ProSol.Html;
@ -12,10 +12,10 @@ namespace ProSol.Html;
/// <remarks>
/// Push-notification happens only when the closing tag met, so it contains the full data on tag.
/// </remarks>
public class TagsProvider : IFilteredPublisher<TagsProviderMessage>
public class TagsProvider
{
readonly HistoryTracker historyTracker = new();
readonly Broadcaster broadcaster = new();
readonly MessageBroker<TagsProviderMessage> broker = new();
public void Process(ReadOnlySpan<char> html)
{
@ -28,14 +28,16 @@ public class TagsProvider : IFilteredPublisher<TagsProviderMessage>
charsProcessed += Proceed(currentHtml);
} while (charsProcessed < html.Length);
broadcaster.OnCompleted();
broker.Complete();
}
public IDisposable Subscribe(ISubscriber<TagsProviderMessage> observer)
=> broadcaster.Subscribe(observer);
=> broker.Subscribe(observer);
public IDisposable Subscribe(ISubscriber<TagsProviderMessage> observer, params string[] tagNames)
=> broadcaster.Subscribe(observer, tagNames);
{
return broker.Subscribe(new TagsSubscription(observer, tagNames));
}
void Process(ReadOnlySpan<char> currentHtml, int charsProcessed)
{
@ -74,7 +76,7 @@ public class TagsProvider : IFilteredPublisher<TagsProviderMessage>
[..history],
value);
broadcaster.OnNext(message, value.TagInfo.Name);
broker.Publish(message);
}
static int Proceed(ReadOnlySpan<char> currentHtml)

View File

@ -14,7 +14,7 @@ internal class TagsProviderListener : ISubscriber<TagsProviderMessage>
public TagsProviderMessage[] Messages => [.. messages];
public void Subscribe(IPublisher<TagsProviderMessage> source)
public void Subscribe(TagsProvider source)
{
unsubscriber = source.Subscribe(this);
}
@ -24,9 +24,9 @@ internal class TagsProviderListener : ISubscriber<TagsProviderMessage>
unsubscriber?.Dispose();
}
public bool OnNext(IEjectable ejectable, TagsProviderMessage value)
public void OnNext(TagsProviderMessage value, NextDelegate next)
{
messages.Add(value);
return false; // TODO: remove;
next();
}
}