4.0.0-rc.9.0 - removed ISubscriber (#20)

Includes
- [x] #18
- [x] #19

Co-authored-by: Alexander Kozachenko <119358312+Alex-Kozachenko@users.noreply.github.com>
Reviewed-on: #20
This commit is contained in:
Alexander Kozachenko 2023-12-07 23:31:37 +00:00
parent e4ed0c9962
commit 82ae73e80f
25 changed files with 60 additions and 251 deletions

View File

@ -11,14 +11,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Publishers", "src\Publisher
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating", "src\Translating\Translating.csproj", "{5549AC68-4155-4E42-B88C-672A8D39E420}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions", "src\Subscriptions\Subscriptions.csproj", "{08D3A876-4D42-49AC-AD6D-E597F9B89822}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating.Tests", "tests\Translating\Translating.Tests.csproj", "{E33F062A-2120-4AF0-8688-330A8CA7F0B0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions.Tests", "tests\Subscriptions\Subscriptions.Tests.csproj", "{90879E74-5F24-4ED6-9988-6FFFFE38B96A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Filtering", "src\Filtering\Filtering.csproj", "{A24F6F20-3B62-44E3-888D-CBCD3F29C477}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Filtering.Tests", "tests\Filtering\Filtering.Tests.csproj", "{308F1FFC-191C-4C33-900A-0567413BE1BB}"
@ -46,18 +42,10 @@ Global
{5549AC68-4155-4E42-B88C-672A8D39E420}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5549AC68-4155-4E42-B88C-672A8D39E420}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5549AC68-4155-4E42-B88C-672A8D39E420}.Release|Any CPU.Build.0 = Release|Any CPU
{08D3A876-4D42-49AC-AD6D-E597F9B89822}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{08D3A876-4D42-49AC-AD6D-E597F9B89822}.Debug|Any CPU.Build.0 = Debug|Any CPU
{08D3A876-4D42-49AC-AD6D-E597F9B89822}.Release|Any CPU.ActiveCfg = Release|Any CPU
{08D3A876-4D42-49AC-AD6D-E597F9B89822}.Release|Any CPU.Build.0 = Release|Any CPU
{E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Release|Any CPU.Build.0 = Release|Any CPU
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.Build.0 = Release|Any CPU
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -75,9 +63,7 @@ Global
{FF4D591C-67FE-4E8E-AD73-D13465D6DB44} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
{4856538B-EE02-4568-B133-4612B01601B0} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
{5549AC68-4155-4E42-B88C-672A8D39E420} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
{08D3A876-4D42-49AC-AD6D-E597F9B89822} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
{E33F062A-2120-4AF0-8688-330A8CA7F0B0} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
{90879E74-5F24-4ED6-9988-6FFFFE38B96A} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
{A24F6F20-3B62-44E3-888D-CBCD3F29C477} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
{308F1FFC-191C-4C33-900A-0567413BE1BB} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
{22F31937-7F3E-47B2-A8BB-DC2B889BA228} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}

View File

@ -2,8 +2,7 @@
Implements a message broker with ability to build a pipeline of listeners.
The project consists of:
- observer-related interfaces,
- A publisher (see `PipelineMessagePublisher`).
- [Subscription](/docs/Subscriptions.md) mechanics for smart subscriber triggering;
- [Translators](/docs/Translating.md) for messages.
- observer-related interfaces [here](/src/Contracts/).
- `PipelineMessagePublisher` [here](/src/Publishers/PipelineMessagePublisher.cs).
- [Translating](/docs/Translating.md) for messages.
- [Filtering](/docs/Filtering.md) for messages.

View File

@ -1,82 +0,0 @@
# ProSol.Messaging.Subscriptions
Enables a conditional subscribing for a publisher:
```csharp
publisher.Subscribe(subscriber, msg => msg.Contains("ERROR"))
```
## Demo
Let`s create a console logger for error messages.
Add the package:
```
dotnet add package ProSol.Messaging --version 4.0.0-rc.6.0
```
Include the package:
```csharp
using ProSol.Messaging;
using ProSol.Messaging.Subscriptions;
```
Now, let`s build a system, which reacts on errors:
```csharp
var publisher = new PipelineMessagePublisher<string>();
var subscriber = new DataSubscriber<string>();
publisher
.Subscribe(new Logger(), msg => msg.Contains("ERROR"))
.Subscribe(subscriber);
```
So, it's a two subscribers:
- `Logger`: writes string to Console;
- `DataSubscriber`: accumulates string in innter list.
Now, lets simulate some logs:
```csharp
string[] source = [
"quick brown fox",
"introduced an ERROR",
"so the lazy dog",
"got a message"
];
foreach (var item in source)
{
publisher.Publish(item);
}
```
> Console output:
``` introduced an ERROR ```
> subscriber.Messages:
```
"quick brown fox",
"introduced an ERROR",
"so the lazy dog",
"got a message"
```
The `Logger` is just a dummy console writer:
```csharp
public class Logger : ISubscriber<string>
{
public void OnCompleted()
{
}
public void OnNext(string message)
{
Console.WriteLine(message);
}
}
```
So, the `Logger` reacts on the `IPublisher`'s messages if only they contain an "ERROR" string, without interfering the pipeline.
Happy coding!
- [BACK](../README.md)

View File

@ -3,7 +3,6 @@
<ItemGroup>
<ProjectReference Include="..\src\Contracts\Contracts.csproj" />
<ProjectReference Include="..\src\Publishers\Publishers.csproj" />
<ProjectReference Include="..\src\Subscriptions\Subscriptions.csproj" />
<ProjectReference Include="..\src\Translating\Translating.csproj" />
<ProjectReference Include="..\src\Filtering\Filtering.csproj" />
</ItemGroup>

View File

@ -3,7 +3,7 @@
<metadata>
<id>ProSol.Messaging</id>
<title>ProSol.Messaging</title>
<version>4.0.0-rc.8.0</version>
<version>4.0.0-rc.9.0</version>
<authors>Alex Kozachenko</authors>
<owners>Alex Kozachenko</owners>
<projectUrl> https://git.disroot.org/alexenko/ProSol.Messaging </projectUrl>

View File

@ -3,12 +3,14 @@ namespace ProSol.Messaging;
/// <summary>
/// A Publisher/Observable object.
/// </summary>
public interface IPublisher
public interface IPublisher<TMessage>
{
void Publish(TMessage message);
/// <summary>
/// Subscribe a subscriber.
/// </summary>
/// <param name="subscriber"> Target. </param>
/// <returns> Custom IDisposable implementation. </returns>
IDisposable Subscribe(ISubscriber subscriber);
IDisposable Subscribe(IPipelineSubscriber<TMessage> subscriber);
}

View File

@ -1,6 +0,0 @@
namespace ProSol.Messaging;
public interface IPublisher<TMessage> : IPublisher
{
void Publish(TMessage message);
}

View File

@ -2,7 +2,10 @@ namespace ProSol.Messaging;
public static class PublishHelper
{
public static void Publish<TMessage>(this IPublisher publisher, TMessage message, params ISubscriber[] subscribers)
public static void Publish<TMessage>(
this IPublisher<TMessage> publisher,
TMessage message,
params IPipelineSubscriber<TMessage>[] subscribers)
{
if (subscribers.Length == 0)
{
@ -13,7 +16,11 @@ public static class PublishHelper
publisher.Publish(message, subscribers[0], next);
}
public static void Publish<TMessage>(this IPublisher publisher, TMessage message, ISubscriber subscriber, NextDelegate next)
public static void Publish<TMessage>(
this IPublisher<TMessage> publisher,
TMessage message,
IPipelineSubscriber<TMessage> subscriber,
NextDelegate next)
{
switch (subscriber)
{
@ -23,12 +30,6 @@ public static class PublishHelper
pipelineSubscriber.OnNext(message, next);
return;
}
case ISubscriber<TMessage> final:
{
final.OnNext(message);
next();
return;
}
default: throw new ArgumentException($"Incorrect subscriber: {subscriber}");
}
}

View File

@ -2,7 +2,7 @@ using System.Collections.Immutable;
namespace ProSol.Messaging;
public sealed class DataSubscriber<TMessage> : ISubscriber<TMessage>
public sealed class DataSubscriber<TMessage> : IPipelineSubscriber<TMessage>
{
private readonly List<TMessage> messages = [];
@ -12,8 +12,9 @@ public sealed class DataSubscriber<TMessage> : ISubscriber<TMessage>
{
}
public void OnNext(TMessage message)
public void OnNext(TMessage message, NextDelegate next)
{
messages.Add(message);
next();
}
}

View File

@ -5,7 +5,7 @@ public delegate void NextDelegate();
/// <summary>
/// An Observer/Subscriber object with pipeline support.
/// </summary>
public interface IPipelineSubscriber<TMessage> : ISubscriber
public interface IPipelineSubscriber<TMessage>
{
/// <summary>
/// Represents a reaction on a message.
@ -16,4 +16,9 @@ public interface IPipelineSubscriber<TMessage> : ISubscriber
/// - For more details about ASP.NET pipeline, see https://learn.microsoft.com/en-us/aspnet/core/fundamentals/middleware/?view=aspnetcore-8.0
/// </remarks>
void OnNext(TMessage message, NextDelegate next);
/// <summary>
/// Finalizes a subscriber.
/// </summary>
void OnCompleted();
}

View File

@ -1,10 +0,0 @@
namespace ProSol.Messaging;
public interface ISubscriber
{
/// <summary>
/// Finalizes a subscriber.
/// </summary>
void OnCompleted();
}

View File

@ -1,7 +0,0 @@
namespace ProSol.Messaging;
public interface ISubscriber<TMessage> : ISubscriber
{
void OnNext(TMessage message);
}

View File

@ -2,7 +2,6 @@
<ItemGroup>
<ProjectReference Include="..\Contracts\Contracts.csproj" />
<ProjectReference Include="..\Subscriptions\Subscriptions.csproj" />
<ProjectReference Include="..\Translating\Translating.csproj" />
</ItemGroup>

View File

@ -7,8 +7,8 @@ public static class IPublisherFluentExtensions
/// <summary>
/// Filters a messages from a publisher.
/// </summary>
public static IPublisher Filter<TMessage>(
this IPublisher publisher,
public static IPublisher<TMessage> Filter<TMessage>(
this IPublisher<TMessage> publisher,
Predicate<TMessage> filterCondition)
{
var retranslator = new Retranslator<TMessage>();
@ -21,7 +21,8 @@ public static class IPublisherFluentExtensions
/// Filters a messages from a publisher.
/// Interrupts a pipeline of a publisher, when the <see cref="filterCondition"/> is met.
/// </summary>
public static IPublisher Endpoint<TMessage>(this IPublisher publisher,
public static IPublisher<TMessage> Endpoint<TMessage>(
this IPublisher<TMessage> publisher,
Predicate<TMessage> filterCondition)
{
var retranslator = new Retranslator<TMessage>();

View File

@ -5,26 +5,26 @@
/// Provides a pipeline mechanics, so a subscriber is able to stop message processing.
/// </summary>
/// <typeparam name="TMessage"> Type of message. </typeparam>
public class PipelineMessagePublisher<TMessage> :
IPublisher<TMessage>
public class PipelineMessagePublisher<TMessage>
: IPublisher<TMessage>
{
private readonly HashSet<ISubscriber> subscribers = [];
private readonly HashSet<IPipelineSubscriber<TMessage>> subscribers = [];
/// <summary>
/// Subscribes a subscriber.
/// </summary>
/// <param name="subscriber"> An implementation of <see cref="ISubscriber"/> or <see cref="SubscriptionBase"/> interfaces. </param>
/// <param name="subscriber"> An implementation of <see cref="IPipelineSubscriber"/> interface. </param>
/// <returns> Unsubscriber. </returns>
public IDisposable Subscribe(ISubscriber subscriber)
public IDisposable Subscribe(IPipelineSubscriber<TMessage> subscriber)
{
subscribers.Add(subscriber);
return new Unsubscriber(subscribers, subscriber);
return new Unsubscriber<TMessage>(subscribers, subscriber);
}
/// <summary>
/// Pushes the message to subscribers.
/// </summary>
/// <param name="message"> A message to push to specific <see cref="ISubscriber"/>. </param>
/// <param name="message"> A message to push to specific <see cref="IPipelineSubscriber"/>. </param>
public void Publish(TMessage message)
=> this.Publish(message, [.. subscribers]);

View File

@ -3,7 +3,6 @@
<ItemGroup>
<ProjectReference Include="..\Contracts\Contracts.csproj" />
<ProjectReference Include="..\Translating\Translating.csproj" />
<ProjectReference Include="..\Subscriptions\Subscriptions.csproj" />
</ItemGroup>
<PropertyGroup>

View File

@ -1,13 +1,13 @@
namespace ProSol.Messaging;
public sealed class Unsubscriber : IDisposable
public sealed class Unsubscriber<TMessage> : IDisposable
{
private readonly ICollection<ISubscriber> subscribers;
private readonly ISubscriber target;
private readonly ICollection<IPipelineSubscriber<TMessage>> subscribers;
private readonly IPipelineSubscriber<TMessage> target;
internal Unsubscriber(
ICollection<ISubscriber> subscribers,
ISubscriber target)
ICollection<IPipelineSubscriber<TMessage>> subscribers,
IPipelineSubscriber<TMessage> target)
{
this.subscribers = subscribers;
this.target = target;

View File

@ -1,26 +0,0 @@
namespace ProSol.Messaging.Subscriptions;
public static class IPublisherFluentExtensions
{
[Obsolete("I am planning to remove these methods in next rc-release. Please use Filtering.Filter or Filtering.Endpoint instead.")]
public static IPublisher Subscribe<TMessage>(
this IPublisher publisher,
ISubscriber<TMessage> subscriber,
Predicate<TMessage> condition)
{
var subcription = new RegularSubscription<TMessage>(subscriber, condition);
var unsubscriber = publisher.Subscribe(subcription);
return publisher;
}
[Obsolete("I am planning to remove these methods in next rc-release. Please use Filtering.Filter or Filtering.Endpoint instead.")]
public static IPublisher Subscribe<TMessage>(
this IPublisher publisher,
IPipelineSubscriber<TMessage> subscriber,
Predicate<TMessage> condition)
{
var subcription = new PipelineSubscription<TMessage>(subscriber, condition);
var unsubscriber = publisher.Subscribe(subcription);
return publisher;
}
}

View File

@ -1,25 +0,0 @@
namespace ProSol.Messaging.Subscriptions;
internal class PipelineSubscription<TMessage>(
IPipelineSubscriber<TMessage> subscriber,
Predicate<TMessage> condition)
: IPipelineSubscriber<TMessage>
{
public void OnCompleted()
{
}
public void OnNext(TMessage message, NextDelegate next)
{
static void dropOff() { }; // acts like an endpoint.
if (condition(message))
{
subscriber.OnNext(message, dropOff);
}
else
{
next();
}
}
}

View File

@ -1,21 +0,0 @@
namespace ProSol.Messaging.Subscriptions;
internal class RegularSubscription<TMessage>(
ISubscriber<TMessage> subscriber,
Predicate<TMessage> condition)
: ISubscriber<TMessage>
{
public void OnCompleted()
{
}
public void OnNext(TMessage message)
{
if (condition(message))
{
subscriber.OnNext(message);
}
}
}

View File

@ -1,12 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\Contracts\Contracts.csproj" />
</ItemGroup>
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>

View File

@ -2,13 +2,13 @@ namespace ProSol.Messaging.Translating;
public static class IPublisherFluentExtensions
{
public static IPublisher Translate<TSource, TDest>(
this IPublisher publisher,
public static IPublisher<TDest> Translate<TSource, TDest>(
this IPublisher<TSource> publisher,
Func<TSource, TDest> converter)
=> Translate(publisher, new SimpleTranslator<TSource, TDest>(converter));
public static IPublisher Translate<TSource, TDest>(
this IPublisher publisher,
public static IPublisher<TDest> Translate<TSource, TDest>(
this IPublisher<TSource> publisher,
TranslatorBase<TSource, TDest> translatorBase)
{
var unsubscriber = publisher.Subscribe(translatorBase);

View File

@ -1,9 +1,9 @@
namespace ProSol.Messaging.Translating;
public abstract class TranslatorBase<TSource, TDest>
: IPipelineSubscriber<TSource>, IPublisher, IDisposable
: IPipelineSubscriber<TSource>, IPublisher<TDest>, IDisposable
{
private ISubscriber? destSubscriber;
private IPipelineSubscriber<TDest>? destSubscriber;
/// <inheritdoc/>
public void OnCompleted() => destSubscriber?.OnCompleted();
@ -28,7 +28,15 @@ public abstract class TranslatorBase<TSource, TDest>
/// </remarks>
protected abstract TDest ConvertMessage(TSource message);
public IDisposable Subscribe(ISubscriber subscriber)
public void Publish(TDest message)
{
if (destSubscriber != null)
{
this.Publish(message, destSubscriber, () => {});
}
}
public IDisposable Subscribe(IPipelineSubscriber<TDest> subscriber)
{
if (destSubscriber != null)
{

View File

@ -3,7 +3,7 @@ namespace ProSol.Messaging.Tests.Common;
public class TestPublisher<TMessage>(IEnumerable<TMessage> messages) : IPublisher<TMessage>
{
readonly Queue<TMessage> messages = new(messages);
ISubscriber? subscriber;
IPipelineSubscriber<TMessage>? subscriber;
public void PublishAll()
{
@ -17,8 +17,8 @@ public class TestPublisher<TMessage>(IEnumerable<TMessage> messages) : IPublishe
{
this.Publish(message, subscriber!);
}
public IDisposable Subscribe(ISubscriber subscriber)
public IDisposable Subscribe(IPipelineSubscriber<TMessage> subscriber)
{
this.subscriber = subscriber;
#pragma warning disable CS8603

View File

@ -4,7 +4,6 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>
@ -19,7 +18,6 @@
<ItemGroup>
<ProjectReference Include="..\Common\Common.csproj" />
<ProjectReference Include="..\..\src\Subscriptions\Subscriptions.csproj" />
</ItemGroup>
</Project>