Filters (#17)
This commit is contained in:
parent
5d0c378a0e
commit
e4ed0c9962
|
@ -19,6 +19,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating.Tests", "tests\
|
|||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions.Tests", "tests\Subscriptions\Subscriptions.Tests.csproj", "{90879E74-5F24-4ED6-9988-6FFFFE38B96A}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Filtering", "src\Filtering\Filtering.csproj", "{A24F6F20-3B62-44E3-888D-CBCD3F29C477}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Filtering.Tests", "tests\Filtering\Filtering.Tests.csproj", "{308F1FFC-191C-4C33-900A-0567413BE1BB}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Common", "tests\Common\Common.csproj", "{22F31937-7F3E-47B2-A8BB-DC2B889BA228}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
|
@ -52,6 +58,18 @@ Global
|
|||
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{A24F6F20-3B62-44E3-888D-CBCD3F29C477}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{308F1FFC-191C-4C33-900A-0567413BE1BB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{308F1FFC-191C-4C33-900A-0567413BE1BB}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{308F1FFC-191C-4C33-900A-0567413BE1BB}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{308F1FFC-191C-4C33-900A-0567413BE1BB}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{22F31937-7F3E-47B2-A8BB-DC2B889BA228}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{22F31937-7F3E-47B2-A8BB-DC2B889BA228}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{22F31937-7F3E-47B2-A8BB-DC2B889BA228}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{22F31937-7F3E-47B2-A8BB-DC2B889BA228}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(NestedProjects) = preSolution
|
||||
{FF4D591C-67FE-4E8E-AD73-D13465D6DB44} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
|
||||
|
@ -60,5 +78,8 @@ Global
|
|||
{08D3A876-4D42-49AC-AD6D-E597F9B89822} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
|
||||
{E33F062A-2120-4AF0-8688-330A8CA7F0B0} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
|
||||
{90879E74-5F24-4ED6-9988-6FFFFE38B96A} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
|
||||
{A24F6F20-3B62-44E3-888D-CBCD3F29C477} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
|
||||
{308F1FFC-191C-4C33-900A-0567413BE1BB} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
|
||||
{22F31937-7F3E-47B2-A8BB-DC2B889BA228} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
||||
|
|
|
@ -3,5 +3,7 @@
|
|||
Implements a message broker with ability to build a pipeline of listeners.
|
||||
The project consists of:
|
||||
- observer-related interfaces,
|
||||
- A publisher (see `PipelineMessagePublisher`).
|
||||
- [Subscription](/docs/Subscriptions.md) mechanics for smart subscriber triggering;
|
||||
- [Translators](/docs/Translating.md) for messages.
|
||||
- [Translators](/docs/Translating.md) for messages.
|
||||
- [Filtering](/docs/Filtering.md) for messages.
|
|
@ -0,0 +1,129 @@
|
|||
# ProSol.Messaging.Filtering
|
||||
|
||||
Allows to catch a specific message by using a predicate.
|
||||
Either clones a message to a subscriber behind via ('.Filter()`) method,
|
||||
or swallows a message out of the pipeline via ('.Endpoint()') method.
|
||||
|
||||
# Demo
|
||||
|
||||
## Filter
|
||||
|
||||
Let's create an app, which logs all strings provided.
|
||||
But counts specific strings in separate subscribers.
|
||||
So the output will demonstrate how many specific messages passed.
|
||||
|
||||
```
|
||||
dotnet add package ProSol.Messaging --version 4.0.0-rc.8.0
|
||||
```
|
||||
|
||||
### Input
|
||||
Let the messages look like this:
|
||||
|
||||
```csharp
|
||||
string[] input = [
|
||||
"nothing happens",
|
||||
"WARN: warning1",
|
||||
"WARN: warning2",
|
||||
"CRIT: critical",
|
||||
"nothing happens",
|
||||
];
|
||||
```
|
||||
|
||||
### Pipeline configuration
|
||||
The pipeline configuration consists of three ways: warns, crits and rest collectors:
|
||||
|
||||
```csharp
|
||||
var publisher = new PipelineMessagePublisher<string>();
|
||||
var warnsCountCollector = new DataSubscriber<bool>();
|
||||
var critsCountCollector = new DataSubscriber<bool>();
|
||||
var restCollector = new DataSubscriber<string>();
|
||||
```
|
||||
|
||||
Lets add three subscriber to the publisher:
|
||||
1. Reacts on any "WARN:" messages.
|
||||
2. Reacts on any "CRIT:" messages.
|
||||
3. Gets all messages.
|
||||
|
||||
```csharp
|
||||
publisher
|
||||
.Filter<string>(x => x.Contains("WARN:"))
|
||||
.Translate<string, bool>(x => true)
|
||||
.Subscribe(warnsCountCollector);
|
||||
|
||||
publisher
|
||||
.Filter<string>(x => x.Contains("CRIT:"))
|
||||
.Translate<string, bool>(x => true)
|
||||
.Subscribe(critsCountCollector);
|
||||
|
||||
publisher
|
||||
.Subscribe(restCollector);
|
||||
```
|
||||
|
||||
### Try
|
||||
|
||||
Lets try this configuration against provided [input](#input):
|
||||
|
||||
```csharp
|
||||
foreach (var message in input)
|
||||
{
|
||||
publisher.Publish(message);
|
||||
}
|
||||
```
|
||||
|
||||
The output is:
|
||||
```
|
||||
warnsCountCollector.Messages.Count() == 2;
|
||||
critsCountCollector.Messages.Count() == 1;
|
||||
restCollector.Messages; // Contains all messages from input.
|
||||
```
|
||||
|
||||
So, the publisher got three subscribers, every message reaches on every subscriber, but two of subscribers are limited by Filter so they react only on specific messages.
|
||||
|
||||
But what if there is a need to swallow a message when the filter condition is met?
|
||||
Let's see the [Endpoints](#endpoint) for that.
|
||||
|
||||
## Endpoint
|
||||
|
||||
Let's just change Filter for Endpoint methods:
|
||||
```csharp
|
||||
publisher
|
||||
.Endpoint<string>(x => x.Contains("WARN:"))
|
||||
.Translate<string, bool>(x => true)
|
||||
.Subscribe(warnsCountCollector);
|
||||
|
||||
publisher
|
||||
.Endpoint<string>(x => x.Contains("CRIT:"))
|
||||
.Translate<string, bool>(x => true)
|
||||
.Subscribe(critsCountCollector);
|
||||
|
||||
publisher
|
||||
.Subscribe(restCollector);
|
||||
```
|
||||
|
||||
Then run them with same [input](#input):
|
||||
|
||||
```csharp
|
||||
foreach (var message in input)
|
||||
{
|
||||
publisher.Publish(message);
|
||||
}
|
||||
```
|
||||
|
||||
And see the output:
|
||||
```csharp
|
||||
warnsCountCollector.Messages.Count() == 2;
|
||||
critsCountCollector.Messages.Count() == 1;
|
||||
|
||||
restCollector.Messages;
|
||||
// OUTPUT:
|
||||
// nothing happens
|
||||
// nothing happens
|
||||
```
|
||||
|
||||
So, the `restCollector` only getting the **unprocessed* messages, and acts like a fallback subscriber in this case.
|
||||
|
||||
# Conclusion
|
||||
|
||||
So, basically, you can either react or catch a message with `Filter` or `Endpoint` methods respectively.
|
||||
|
||||
- [BACK](../README.md)
|
|
@ -7,7 +7,7 @@ publisher.Subscribe(subscriber, msg => msg.Contains("ERROR"))
|
|||
|
||||
## Demo
|
||||
|
||||
Lets create a console logger for error messages.
|
||||
Let`s create a console logger for error messages.
|
||||
|
||||
Add the package:
|
||||
```
|
||||
|
@ -20,7 +20,7 @@ using ProSol.Messaging;
|
|||
using ProSol.Messaging.Subscriptions;
|
||||
```
|
||||
|
||||
Now, lets build a system, which reacts on errors:
|
||||
Now, let`s build a system, which reacts on errors:
|
||||
```csharp
|
||||
var publisher = new PipelineMessagePublisher<string>();
|
||||
var subscriber = new DataSubscriber<string>();
|
||||
|
@ -75,4 +75,8 @@ public class Logger : ISubscriber<string>
|
|||
}
|
||||
```
|
||||
|
||||
So, the `Logger` reacts on the `IPublisher`'s messages if only they contain an "ERROR" string, without interfering the pipeline.
|
||||
|
||||
Happy coding!
|
||||
|
||||
- [BACK](../README.md)
|
|
@ -1,8 +1,6 @@
|
|||
# ProSol.Messaging.Translating
|
||||
|
||||
A toolset for making a chain of subscribers from one format to another.
|
||||
|
||||
It reduces the amout of extra subscribers which could exist just for a one line of code.
|
||||
Allows to translate one message type to another with one line via `.Translate` method.
|
||||
|
||||
## Demo
|
||||
|
||||
|
@ -22,7 +20,7 @@ using ProSol.Messaging.Translating;
|
|||
var dataSubscriber = new DataSubscriber<bool>(); // a dummy subscriber which just collects data in List.
|
||||
var publisher = new SomePublisher<string>(); // a custom dummy publisher, see below.
|
||||
|
||||
// Decalre a pipeline:
|
||||
// Declare a pipeline:
|
||||
publisher
|
||||
.Translate<string, int>(x => x.Length)
|
||||
.Translate<int, bool>(x => x > 5)
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
<ProjectReference Include="..\src\Publishers\Publishers.csproj" />
|
||||
<ProjectReference Include="..\src\Subscriptions\Subscriptions.csproj" />
|
||||
<ProjectReference Include="..\src\Translating\Translating.csproj" />
|
||||
<ProjectReference Include="..\src\Filtering\Filtering.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
./nuget-pack.sh
|
||||
./.nuget-push.sh
|
|
@ -3,7 +3,7 @@
|
|||
<metadata>
|
||||
<id>ProSol.Messaging</id>
|
||||
<title>ProSol.Messaging</title>
|
||||
<version>4.0.0-rc.6.0</version>
|
||||
<version>4.0.0-rc.8.0</version>
|
||||
<authors>Alex Kozachenko</authors>
|
||||
<owners>Alex Kozachenko</owners>
|
||||
<projectUrl> https://git.disroot.org/alexenko/ProSol.Messaging </projectUrl>
|
||||
|
|
|
@ -2,16 +2,25 @@ namespace ProSol.Messaging;
|
|||
|
||||
public static class PublishHelper
|
||||
{
|
||||
public static void Publish<TMessage>(TMessage message, params ISubscriber[] subscribers)
|
||||
public static void Publish<TMessage>(this IPublisher publisher, TMessage message, params ISubscriber[] subscribers)
|
||||
{
|
||||
void next() => Publish(message, subscribers[1..]);
|
||||
switch (subscribers.FirstOrDefault())
|
||||
if (subscribers.Length == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
void next() => publisher.Publish(message, subscribers[1..]);
|
||||
publisher.Publish(message, subscribers[0], next);
|
||||
}
|
||||
|
||||
public static void Publish<TMessage>(this IPublisher publisher, TMessage message, ISubscriber subscriber, NextDelegate next)
|
||||
{
|
||||
switch (subscriber)
|
||||
{
|
||||
case null: return;
|
||||
case IPipelineSubscriber<TMessage> subscriber:
|
||||
case IPipelineSubscriber<TMessage> pipelineSubscriber:
|
||||
{
|
||||
|
||||
subscriber.OnNext(message, next);
|
||||
pipelineSubscriber.OnNext(message, next);
|
||||
return;
|
||||
}
|
||||
case ISubscriber<TMessage> final:
|
||||
|
@ -20,9 +29,7 @@ public static class PublishHelper
|
|||
next();
|
||||
return;
|
||||
}
|
||||
default: throw new ArgumentException($"Incorrect subscriber: {subscriber}");
|
||||
}
|
||||
}
|
||||
|
||||
public static void Publish<TMessage>(TMessage message, ISubscriber subscriber)
|
||||
=> Publish(message, [subscriber]);
|
||||
}
|
|
@ -10,7 +10,6 @@ public sealed class DataSubscriber<TMessage> : ISubscriber<TMessage>
|
|||
|
||||
public void OnCompleted()
|
||||
{
|
||||
messages.Clear();
|
||||
}
|
||||
|
||||
public void OnNext(TMessage message)
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
namespace ProSol.Messaging.Filtering;
|
||||
|
||||
internal class EndpointSubscription<TMessage>(
|
||||
IPipelineSubscriber<TMessage> subscriber,
|
||||
Predicate<TMessage> condition)
|
||||
: IPipelineSubscriber<TMessage>
|
||||
{
|
||||
public void OnCompleted()
|
||||
{
|
||||
}
|
||||
|
||||
public void OnNext(TMessage message, NextDelegate next)
|
||||
{
|
||||
static void dropOff() { };
|
||||
if (condition(message))
|
||||
{
|
||||
subscriber.OnNext(message, dropOff);
|
||||
}
|
||||
else
|
||||
{
|
||||
next();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
namespace ProSol.Messaging.Filtering;
|
||||
|
||||
internal class FilterSubscription<TMessage>(
|
||||
IPipelineSubscriber<TMessage> subscriber,
|
||||
Predicate<TMessage> condition)
|
||||
: IPipelineSubscriber<TMessage>
|
||||
{
|
||||
public void OnCompleted()
|
||||
{
|
||||
}
|
||||
|
||||
public void OnNext(TMessage message, NextDelegate next)
|
||||
{
|
||||
if (condition(message))
|
||||
{
|
||||
subscriber.OnNext(message, next);
|
||||
}
|
||||
else
|
||||
{
|
||||
next();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Contracts\Contracts.csproj" />
|
||||
<ProjectReference Include="..\Subscriptions\Subscriptions.csproj" />
|
||||
<ProjectReference Include="..\Translating\Translating.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
</Project>
|
|
@ -0,0 +1,32 @@
|
|||
using ProSol.Messaging.Translating;
|
||||
|
||||
namespace ProSol.Messaging.Filtering;
|
||||
|
||||
public static class IPublisherFluentExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Filters a messages from a publisher.
|
||||
/// </summary>
|
||||
public static IPublisher Filter<TMessage>(
|
||||
this IPublisher publisher,
|
||||
Predicate<TMessage> filterCondition)
|
||||
{
|
||||
var retranslator = new Retranslator<TMessage>();
|
||||
var filter = new FilterSubscription<TMessage>(retranslator, filterCondition);
|
||||
var unsubscriber = publisher.Subscribe(filter);
|
||||
return retranslator;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Filters a messages from a publisher.
|
||||
/// Interrupts a pipeline of a publisher, when the <see cref="filterCondition"/> is met.
|
||||
/// </summary>
|
||||
public static IPublisher Endpoint<TMessage>(this IPublisher publisher,
|
||||
Predicate<TMessage> filterCondition)
|
||||
{
|
||||
var retranslator = new Retranslator<TMessage>();
|
||||
var filter = new EndpointSubscription<TMessage>(retranslator, filterCondition);
|
||||
var unsubscriber = publisher.Subscribe(filter);
|
||||
return retranslator;
|
||||
}
|
||||
}
|
|
@ -26,7 +26,7 @@ public class PipelineMessagePublisher<TMessage> :
|
|||
/// </summary>
|
||||
/// <param name="message"> A message to push to specific <see cref="ISubscriber"/>. </param>
|
||||
public void Publish(TMessage message)
|
||||
=> PublishHelper.Publish(message, [.. subscribers]);
|
||||
=> this.Publish(message, [.. subscribers]);
|
||||
|
||||
/// <summary>
|
||||
/// Pushes the `complete` message, so the subscribers can finalize the subscription.
|
||||
|
|
|
@ -2,6 +2,7 @@ namespace ProSol.Messaging.Subscriptions;
|
|||
|
||||
public static class IPublisherFluentExtensions
|
||||
{
|
||||
[Obsolete("I am planning to remove these methods in next rc-release. Please use Filtering.Filter or Filtering.Endpoint instead.")]
|
||||
public static IPublisher Subscribe<TMessage>(
|
||||
this IPublisher publisher,
|
||||
ISubscriber<TMessage> subscriber,
|
||||
|
@ -12,6 +13,7 @@ public static class IPublisherFluentExtensions
|
|||
return publisher;
|
||||
}
|
||||
|
||||
[Obsolete("I am planning to remove these methods in next rc-release. Please use Filtering.Filter or Filtering.Endpoint instead.")]
|
||||
public static IPublisher Subscribe<TMessage>(
|
||||
this IPublisher publisher,
|
||||
IPipelineSubscriber<TMessage> subscriber,
|
|
@ -12,9 +12,10 @@ internal class PipelineSubscription<TMessage>(
|
|||
|
||||
public void OnNext(TMessage message, NextDelegate next)
|
||||
{
|
||||
static void dropOff() { }; // acts like an endpoint.
|
||||
if (condition(message))
|
||||
{
|
||||
subscriber.OnNext(message, next);
|
||||
subscriber.OnNext(message, dropOff);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
namespace ProSol.Messaging.Translating;
|
||||
|
||||
public static class IPublisherFluentExtensions
|
||||
{
|
||||
public static IPublisher Translate<TSource, TDest>(
|
||||
this IPublisher publisher,
|
||||
Func<TSource, TDest> converter)
|
||||
=> Translate(publisher, new SimpleTranslator<TSource, TDest>(converter));
|
||||
|
||||
public static IPublisher Translate<TSource, TDest>(
|
||||
this IPublisher publisher,
|
||||
TranslatorBase<TSource, TDest> translatorBase)
|
||||
{
|
||||
var unsubscriber = publisher.Subscribe(translatorBase);
|
||||
return translatorBase;
|
||||
}
|
||||
}
|
|
@ -1,13 +0,0 @@
|
|||
namespace ProSol.Messaging.Translating;
|
||||
|
||||
public static class IPublisherFluentExtensions
|
||||
{
|
||||
public static IPublisher Translate<TSource, TDest>(
|
||||
this IPublisher publisher,
|
||||
Func<TSource, TDest> converter)
|
||||
{
|
||||
var result = new SimpleTranslator<TSource, TDest>(converter);
|
||||
var unsubscriber = publisher.Subscribe(result);
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
namespace ProSol.Messaging.Translating;
|
||||
|
||||
/// <summary>
|
||||
/// A dummy subscriber which just pushes a message.
|
||||
/// Usefull for custom wrappers around a subscribers.
|
||||
/// </summary>
|
||||
public sealed class Retranslator<TMessage> : TranslatorBase<TMessage, TMessage>
|
||||
{
|
||||
protected override TMessage ConvertMessage(TMessage message) => message;
|
||||
}
|
|
@ -12,13 +12,9 @@ public abstract class TranslatorBase<TSource, TDest>
|
|||
public void OnNext(TSource message, NextDelegate next)
|
||||
{
|
||||
var dest = ConvertMessage(message);
|
||||
if (dest != null && destSubscriber != null)
|
||||
if (destSubscriber != null)
|
||||
{
|
||||
PublishHelper.Publish(dest, destSubscriber);
|
||||
}
|
||||
else
|
||||
{
|
||||
next();
|
||||
this.Publish(dest, destSubscriber, next);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,7 +26,7 @@ public abstract class TranslatorBase<TSource, TDest>
|
|||
/// <remarks>
|
||||
/// - The NextDelegate is triggered by target subsriber, or by this class if unconvertable.
|
||||
/// </remarks>
|
||||
protected abstract TDest? ConvertMessage(TSource message);
|
||||
protected abstract TDest ConvertMessage(TSource message);
|
||||
|
||||
public IDisposable Subscribe(ISubscriber subscriber)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Contracts\Contracts.csproj" />
|
||||
<ProjectReference Include="..\..\src\Publishers\Publishers.csproj" />
|
||||
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
</Project>
|
|
@ -0,0 +1,28 @@
|
|||
namespace ProSol.Messaging.Tests.Common;
|
||||
|
||||
public class TestPublisher<TMessage>(IEnumerable<TMessage> messages) : IPublisher<TMessage>
|
||||
{
|
||||
readonly Queue<TMessage> messages = new(messages);
|
||||
ISubscriber? subscriber;
|
||||
|
||||
public void PublishAll()
|
||||
{
|
||||
while(messages.Count != 0)
|
||||
{
|
||||
Publish(messages.Dequeue());
|
||||
}
|
||||
}
|
||||
|
||||
public void Publish(TMessage message)
|
||||
{
|
||||
this.Publish(message, subscriber!);
|
||||
}
|
||||
|
||||
public IDisposable Subscribe(ISubscriber subscriber)
|
||||
{
|
||||
this.subscriber = subscriber;
|
||||
#pragma warning disable CS8603
|
||||
return null;
|
||||
#pragma warning restore CS8603
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
|
||||
<IsPackable>false</IsPackable>
|
||||
<IsTestProject>true</IsTestProject>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
|
||||
<PackageReference Include="NUnit" Version="3.13.3" />
|
||||
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />
|
||||
<PackageReference Include="NUnit.Analyzers" Version="3.6.1" />
|
||||
<PackageReference Include="coverlet.collector" Version="6.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Filtering\Filtering.csproj" />
|
||||
<ProjectReference Include="..\Common\Common.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
|
@ -0,0 +1,104 @@
|
|||
using ProSol.Messaging.Translating;
|
||||
|
||||
namespace Filtering.Tests;
|
||||
|
||||
public class FilteringTests
|
||||
{
|
||||
[Test]
|
||||
public void Endpoint_ShouldWork()
|
||||
{
|
||||
string[] input = [
|
||||
"nothing happens",
|
||||
"WARN: warning1",
|
||||
"WARN: warning2",
|
||||
"CRIT: critical",
|
||||
"nothing happens",
|
||||
];
|
||||
|
||||
var expected = new
|
||||
{
|
||||
warn = new string[] {
|
||||
"WARN: warning1",
|
||||
"WARN: warning2",
|
||||
},
|
||||
crit = new string[]
|
||||
{
|
||||
"CRIT: critical",
|
||||
},
|
||||
rest = new string[] {
|
||||
"nothing happens",
|
||||
"nothing happens"
|
||||
}
|
||||
};
|
||||
|
||||
var publisher = new PipelineMessagePublisher<string>();
|
||||
var warnCollector = new DataSubscriber<string>();
|
||||
var critCollector = new DataSubscriber<string>();
|
||||
var restCollector = new DataSubscriber<string>();
|
||||
|
||||
publisher
|
||||
.Endpoint<string>(x => x.Contains("WARN:"))
|
||||
.Subscribe(warnCollector);
|
||||
|
||||
publisher
|
||||
.Endpoint<string>(x => x.Contains("CRIT:"))
|
||||
.Subscribe(critCollector);
|
||||
|
||||
publisher
|
||||
.Subscribe(restCollector);
|
||||
|
||||
foreach (var message in input)
|
||||
{
|
||||
publisher.Publish(message);
|
||||
}
|
||||
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(warnCollector.Messages, Is.EquivalentTo(expected.warn));
|
||||
Assert.That(critCollector.Messages, Is.EquivalentTo(expected.crit));
|
||||
Assert.That(restCollector.Messages, Is.EquivalentTo(expected.rest));
|
||||
});
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Filter_ShouldWork()
|
||||
{
|
||||
string[] input = [
|
||||
"nothing happens",
|
||||
"WARN: warning1",
|
||||
"WARN: warning2",
|
||||
"CRIT: critical",
|
||||
"nothing happens",
|
||||
];
|
||||
|
||||
var publisher = new PipelineMessagePublisher<string>();
|
||||
var warnsCountCollector = new DataSubscriber<bool>();
|
||||
var critsCountCollector = new DataSubscriber<bool>();
|
||||
var restCollector = new DataSubscriber<string>();
|
||||
|
||||
publisher
|
||||
.Filter<string>(x => x.Contains("WARN:"))
|
||||
.Translate<string, bool>(x => true)
|
||||
.Subscribe(warnsCountCollector);
|
||||
|
||||
publisher
|
||||
.Filter<string>(x => x.Contains("CRIT:"))
|
||||
.Translate<string, bool>(x => true)
|
||||
.Subscribe(critsCountCollector);
|
||||
|
||||
publisher
|
||||
.Subscribe(restCollector);
|
||||
|
||||
foreach (var message in input)
|
||||
{
|
||||
publisher.Publish(message);
|
||||
}
|
||||
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(warnsCountCollector.Messages, Has.Length.EqualTo(2));
|
||||
Assert.That(critsCountCollector.Messages, Has.Length.EqualTo(1));
|
||||
Assert.That(restCollector.Messages, Is.EquivalentTo(input));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
global using NUnit.Framework;
|
||||
global using ProSol.Messaging;
|
||||
global using ProSol.Messaging.Filtering;
|
||||
global using ProSol.Messaging.Tests.Common;
|
|
@ -0,0 +1,21 @@
|
|||
namespace Filtering.Tests;
|
||||
|
||||
public class IntegrationTests
|
||||
{
|
||||
[Test]
|
||||
public void Check_Pipeline_Chain_Works()
|
||||
{
|
||||
var publisher = new PipelineMessagePublisher<string>();
|
||||
var data = new DataSubscriber<string>();
|
||||
|
||||
publisher
|
||||
.Filter<string>(x => x.StartsWith("CRIT"))
|
||||
.Subscribe(data);
|
||||
|
||||
publisher.Publish("CRITICAL message");
|
||||
publisher.Publish("Verbose message");
|
||||
|
||||
Assert.That(data.Messages, Has.Length.EqualTo(1));
|
||||
Assert.That(data.Messages.First(), Is.EqualTo("CRITICAL message"));
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Common\Common.csproj" />
|
||||
<ProjectReference Include="..\..\src\Subscriptions\Subscriptions.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
using ProSol.Messaging;
|
||||
using ProSol.Messaging.Translating;
|
||||
using ProSol.Messaging.Filtering;
|
||||
|
||||
namespace Translating.Tests;
|
||||
|
||||
public class IntegrationTests
|
||||
{
|
||||
[Test]
|
||||
public void Check_Pipeline_Chain_Works()
|
||||
{
|
||||
var publisher = new PipelineMessagePublisher<string>();
|
||||
var data = new DataSubscriber<int>();
|
||||
|
||||
publisher
|
||||
.Translate<string, int>(x => x.Length)
|
||||
.Subscribe(data);
|
||||
|
||||
publisher.Publish("FOO");
|
||||
|
||||
Assert.That(data.Messages, Has.Length.EqualTo(1));
|
||||
Assert.That(data.Messages.First(), Is.EqualTo(3));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Check_Pipeline_Fallback_Chain_WithTranslators_Works()
|
||||
{
|
||||
var publisher = new PipelineMessagePublisher<int>();
|
||||
var crits = new DataSubscriber<string>();
|
||||
var notCrits = new DataSubscriber<string>();
|
||||
|
||||
publisher
|
||||
.Translate<int, string>(x => x > 10 ? "CRITICAL message" : "")
|
||||
.Endpoint<string>(x => x.StartsWith("CRIT"))
|
||||
.Subscribe(crits);
|
||||
|
||||
publisher
|
||||
.Translate<int, string>( x => $"Verbose message {x}")
|
||||
.Subscribe(notCrits);
|
||||
|
||||
publisher.Publish(9);
|
||||
publisher.Publish(11);
|
||||
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(crits.Messages, Has.Length.EqualTo(1));
|
||||
Assert.That(crits.Messages.First(), Is.EqualTo("CRITICAL message"));
|
||||
|
||||
Assert.That(notCrits.Messages, Has.Length.EqualTo(1));
|
||||
Assert.That(notCrits.Messages.First(), Is.EqualTo("Verbose message 9"));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
using ProSol.Messaging;
|
||||
using ProSol.Messaging.Tests.Common;
|
||||
using ProSol.Messaging.Translating;
|
||||
|
||||
namespace Messaging.Tests;
|
||||
namespace ProSol.Messaging.Tests;
|
||||
|
||||
public class MessageTranslatorTests
|
||||
{
|
||||
|
@ -14,37 +14,16 @@ public class MessageTranslatorTests
|
|||
|
||||
// Arrange operations.
|
||||
var dataSubscriber = new DataSubscriber<bool>();
|
||||
var publisher = new TestPublisher<string>();
|
||||
var publisher = new TestPublisher<string>(input);
|
||||
using var unsub = publisher
|
||||
.Translate<string, int>(x => x.Length)
|
||||
.Translate<int, bool>(x => x > 5)
|
||||
.Subscribe(dataSubscriber);
|
||||
|
||||
// Act.
|
||||
foreach(var item in input)
|
||||
{
|
||||
publisher.Publish(item);
|
||||
}
|
||||
publisher.PublishAll();
|
||||
|
||||
// Assert.
|
||||
Assert.That(dataSubscriber.Messages, Is.EquivalentTo(expected));
|
||||
}
|
||||
|
||||
public class TestPublisher<TData> : IPublisher<TData>
|
||||
{
|
||||
private ISubscriber? subscriber;
|
||||
|
||||
public void Publish(TData message)
|
||||
{
|
||||
PublishHelper.Publish(message, subscriber!);
|
||||
}
|
||||
|
||||
public IDisposable Subscribe(ISubscriber subscriber)
|
||||
{
|
||||
this.subscriber = subscriber;
|
||||
#pragma warning disable CS8603
|
||||
return null;
|
||||
#pragma warning restore CS8603
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,7 +18,9 @@
|
|||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Filtering\Filtering.csproj" />
|
||||
<ProjectReference Include="..\..\src\Translating\Translating.csproj" />
|
||||
<ProjectReference Include="..\Common\Common.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
|
Loading…
Reference in New Issue