diff --git a/Directory.Build.targets b/Directory.Build.targets
new file mode 100644
index 0000000..77dcd79
--- /dev/null
+++ b/Directory.Build.targets
@@ -0,0 +1,22 @@
+
+
+
+
+
+
+
+
+
+
+ <_Parameter1>%(InternalsVisibleTo.Identity)
+
+
+
+
+
+
+ <_Parameter1>$(AssemblyName)%(InternalsVisibleToSuffix.Identity)
+
+
+
+
\ No newline at end of file
diff --git a/Messaging.code-workspace b/Messaging.code-workspace
new file mode 100644
index 0000000..ffef029
--- /dev/null
+++ b/Messaging.code-workspace
@@ -0,0 +1,57 @@
+// Please use this file for new Modules.
+// Remove .txt and put in root of module.
+// And remove this comment.
+{
+ "folders": [
+ {
+ "path": "."
+ }
+ ],
+ "tasks": {
+ "version": "2.0.0",
+ "presentation":{
+ "reveal": "silent"
+ },
+ "tasks": [
+ {
+ "label": "dotnet: build",
+ "type": "shell",
+ "command": "dotnet",
+ "args": [
+ "build",
+ "/property:GenerateFullPaths=true",
+ "/consoleloggerparameters:NoSummary"
+ ],
+ "problemMatcher": [
+ "$msCompile"
+ ],
+ "group": {
+ "kind": "build",
+ "isDefault": true
+ },
+ },
+ {
+ "label": "dotnet: test",
+ "type": "shell",
+ "command": "dotnet",
+ "args": [
+ "test"
+ ],
+ "problemMatcher": [
+ "$msCompile"
+ ],
+ "group": {
+ "kind": "test",
+ "isDefault": true
+ },
+ }
+ ]
+ },
+ "extensions": {
+ "recommendations": [
+ "ms-dotnettools.vscode-dotnet-runtime",
+ "ms-dotnettools.csharp",
+ "ms-dotnettools.csdevkit",
+ ]
+ }
+}
\ No newline at end of file
diff --git a/Messaging.sln b/Messaging.sln
new file mode 100644
index 0000000..98ea707
--- /dev/null
+++ b/Messaging.sln
@@ -0,0 +1,64 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.0.31903.59
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{C9061001-B6D0-49F8-AA95-5D421E96DDA2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contracts", "src\Contracts\Contracts.csproj", "{FF4D591C-67FE-4E8E-AD73-D13465D6DB44}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Publishers", "src\Publishers\Publishers.csproj", "{4856538B-EE02-4568-B133-4612B01601B0}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating", "src\Translating\Translating.csproj", "{5549AC68-4155-4E42-B88C-672A8D39E420}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions", "src\Subscriptions\Subscriptions.csproj", "{08D3A876-4D42-49AC-AD6D-E597F9B89822}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating.Tests", "tests\Translating\Translating.Tests.csproj", "{E33F062A-2120-4AF0-8688-330A8CA7F0B0}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions.Tests", "tests\Subscriptions\Subscriptions.Tests.csproj", "{90879E74-5F24-4ED6-9988-6FFFFE38B96A}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4856538B-EE02-4568-B133-4612B01601B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4856538B-EE02-4568-B133-4612B01601B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4856538B-EE02-4568-B133-4612B01601B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4856538B-EE02-4568-B133-4612B01601B0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5549AC68-4155-4E42-B88C-672A8D39E420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {5549AC68-4155-4E42-B88C-672A8D39E420}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5549AC68-4155-4E42-B88C-672A8D39E420}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {5549AC68-4155-4E42-B88C-672A8D39E420}.Release|Any CPU.Build.0 = Release|Any CPU
+ {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {FF4D591C-67FE-4E8E-AD73-D13465D6DB44} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
+ {4856538B-EE02-4568-B133-4612B01601B0} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
+ {5549AC68-4155-4E42-B88C-672A8D39E420} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
+ {08D3A876-4D42-49AC-AD6D-E597F9B89822} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2}
+ {E33F062A-2120-4AF0-8688-330A8CA7F0B0} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
+ {90879E74-5F24-4ED6-9988-6FFFFE38B96A} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}
+ EndGlobalSection
+EndGlobal
diff --git a/README.md b/README.md
index 45e8a17..8f5fd0e 100644
--- a/README.md
+++ b/README.md
@@ -1,77 +1,9 @@
# ProSol.Messaging
Implements a message broker with ability to build a pipeline of listeners.
+The project consists of:
+- observer-related interfaces,
+- advanced subscriptions,
+- pipeline-based publisher
+- translators for a messages.
-## API
-
-The project contains an API for the `Observer pattern`: `IPublisher`, `ISubscriber`, and `PipelinePublisher`.
-
-- The `IPublisher` interface is aimed to provide a layer of absctraction to the client.
-- The `ISubscriber` interface decalres a subscription contract with chained calls support via `NextDelegate`.
- - `NextDelegate`: is a **mandatory** delegate to trigger if current subscriber wants to bypass a message to next subscriber.
-- `PipelinePublisher`: a ready-to-use implementation for chained subscriptions.
-
-## Demo
-
-Let's see how works in a console app,
-by making a publisher for strings,
-which addresses the message to the StringListener,
-but there is a StringLimitListener between them.
-
-So, the StringLimitListener will drop the message longer than "Please" string:
-
-```csharp
-using ProSol.Messaging;
-
-var maxLength = "Please".Length + 1;
-
-var broker = new MessageBroker();
-// Set up a chain.
-broker.Subscribe(new StringLimitListener(maxLength));
-broker.Subscribe(new StringListener());
-
-// Push some messages.
-broker.Publish("Please");
-broker.Publish("STOP");
-broker.Publish("The planet");
-// OUTPUT:
-// Please
-// STOP
-
-// Clean up.
-broker.Complete();
-```
-
-```csharp
-public class StringListener : ISubscriber
-{
- public void OnCompleted() { }
-
- public void OnNext(string message, NextDelegate next)
- {
- Console.WriteLine(message);
-
- if (message != "STOP")
- {
- next();
- }
- }
-}
-```
-
-```csharp
-public class StringLimitListener(int limit) : ISubscriber
-{
- private readonly int limit = limit;
-
- public void OnCompleted() { }
-
- public void OnNext(string message, NextDelegate next)
- {
- if (message.Length < limit)
- {
- next();
- }
- }
-}
-```
\ No newline at end of file
diff --git a/docs/Subscriptions.md b/docs/Subscriptions.md
new file mode 100644
index 0000000..f49c481
--- /dev/null
+++ b/docs/Subscriptions.md
@@ -0,0 +1,83 @@
+# ProSol.Messaging.Subscriptions
+
+A set of wrappers for a Subscriber, which represent the connection between Publisher and Subscriber - the Subsription.
+The Subscription is a smart trigger for a Subscriber, providing the conditions for triggering.
+
+## FilteredSubscription
+
+The `FilteredSubscription`, triggers the Subscriber when conditions are met: `All` or `Any`.
+
+## Demo
+
+Lets create a notification when an error is logged.
+
+Add the package:
+```
+dotnet add package ProSol.Messaging.Subscriptions
+```
+
+Include the package:
+```csharp
+using ProSol.Messaging;
+using ProSol.Messaging.Subscriptions;
+```
+
+Now, lets build a system, which reacts on errors:
+```csharp
+// Declare the predicates.
+Predicate[] predicates = [
+ x => x.Contains("ERROR"),
+ x => x.Contains("CRITICAL")
+];
+
+// Wire up a dummy string subscriber.
+var subscriber = new StringSubscriber(message => Console.WriteLine(message));
+
+// Create an Any-subscription.
+var subscription = new FilteredSubscription
+ .Any(
+ subscriber,
+ predicates);
+
+// Create the publisher and subscription.
+var publisher = new PipelinePublisher();
+publisher.Subscribe(subscription);
+```
+
+Now, lets simulate some logs:
+
+```csharp
+string[] source = [
+ "quick brown fox",
+ "introduced an ERROR",
+ "so the lazy dog",
+ "got a CRITICAL message"
+];
+
+foreach (var item in source)
+{
+ publisher.Publish(item);
+}
+
+// OUTPUT:
+// introduced an ERROR
+// got a CRITICAL message
+```
+
+```csharp
+public class StringSubscriber(Action action) : ISubscriber
+{
+ public void OnCompleted() { }
+
+ public void OnNext(string message, NextDelegate next)
+ {
+ action(message);
+ next();
+ }
+}
+```
+
+That's it! Just wrap a `FilteredSubscription` around a `Subscriber`, subscribe it on `Publisher` and run `publisher.Publish`.
+Please refer to [tests folder](/tests/) for more cases.
+
+Happy coding!
\ No newline at end of file
diff --git a/pack/ProSol.Messaging.csproj b/pack/ProSol.Messaging.csproj
new file mode 100644
index 0000000..a1a5fb6
--- /dev/null
+++ b/pack/ProSol.Messaging.csproj
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+ net8.0
+ enable
+ enable
+
+
diff --git a/pack/nuget-pack.sh b/pack/nuget-pack.sh
new file mode 100755
index 0000000..714d39e
--- /dev/null
+++ b/pack/nuget-pack.sh
@@ -0,0 +1,2 @@
+rm -rf .nuget
+dotnet pack -c Release -p:NuspecFile=package.nuspec --include-source -o=.nuget
diff --git a/src/package.nuspec b/pack/package.nuspec
similarity index 83%
rename from src/package.nuspec
rename to pack/package.nuspec
index 3af84e8..286bfad 100644
--- a/src/package.nuspec
+++ b/pack/package.nuspec
@@ -3,7 +3,7 @@
ProSol.Messaging
ProSol.Messaging
- 4.0.0-rc.2
+ 4.0.0-rc.5.0
Alex Kozachenko
Alex Kozachenko
https://git.disroot.org/alexenko/ProSol.Messaging
@@ -20,7 +20,8 @@
tool observer observer-pattern design-patterns
-
+
+
-
\ No newline at end of file
+
diff --git a/src.sln b/src.sln
deleted file mode 100644
index 258c4bd..0000000
--- a/src.sln
+++ /dev/null
@@ -1,28 +0,0 @@
-
-Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 17
-VisualStudioVersion = 17.0.31903.59
-MinimumVisualStudioVersion = 10.0.40219.1
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProSol.Messaging", "src\Messaging.csproj", "{89203112-653F-4A81-A3B9-9B711E5F6A6B}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messaging.Tests", "tests\Messaging.Tests.csproj", "{9D9B8827-380D-4AE5-8BA5-F4056935C13A}"
-EndProject
-Global
- GlobalSection(SolutionConfigurationPlatforms) = preSolution
- Debug|Any CPU = Debug|Any CPU
- Release|Any CPU = Release|Any CPU
- EndGlobalSection
- GlobalSection(SolutionProperties) = preSolution
- HideSolutionNode = FALSE
- EndGlobalSection
- GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Release|Any CPU.Build.0 = Release|Any CPU
- {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Release|Any CPU.Build.0 = Release|Any CPU
- EndGlobalSection
-EndGlobal
diff --git a/src/Contracts/Contracts.csproj b/src/Contracts/Contracts.csproj
new file mode 100644
index 0000000..b007a4d
--- /dev/null
+++ b/src/Contracts/Contracts.csproj
@@ -0,0 +1,8 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
diff --git a/src/IPublisher.cs b/src/Contracts/Publishers/IPublisher.cs
similarity index 73%
rename from src/IPublisher.cs
rename to src/Contracts/Publishers/IPublisher.cs
index eaf2e45..f501961 100644
--- a/src/IPublisher.cs
+++ b/src/Contracts/Publishers/IPublisher.cs
@@ -3,12 +3,12 @@ namespace ProSol.Messaging;
///
/// A Publisher/Observable object.
///
-public interface IPublisher
+public interface IPublisher
{
///
/// Subscribe a subscriber.
///
/// Target.
/// Custom IDisposable implementation.
- IDisposable Subscribe(ISubscriber subscriber);
+ IDisposable Subscribe(ISubscriber subscriber);
}
diff --git a/src/Contracts/Publishers/IPublisher_T.cs b/src/Contracts/Publishers/IPublisher_T.cs
new file mode 100644
index 0000000..ccff95a
--- /dev/null
+++ b/src/Contracts/Publishers/IPublisher_T.cs
@@ -0,0 +1,6 @@
+namespace ProSol.Messaging;
+
+public interface IPublisher : IPublisher
+{
+ void Publish(TMessage message);
+}
diff --git a/src/Contracts/Publishers/PublishHelper.cs b/src/Contracts/Publishers/PublishHelper.cs
new file mode 100644
index 0000000..e5f55e6
--- /dev/null
+++ b/src/Contracts/Publishers/PublishHelper.cs
@@ -0,0 +1,26 @@
+namespace ProSol.Messaging;
+
+public static class PublishHelper
+{
+ public static void Publish(TMessage message, params ISubscriber[] subscribers)
+ {
+ switch (subscribers.FirstOrDefault())
+ {
+ case null: return;
+ case IPipelineSubscriber subscriber:
+ {
+ void next() => Publish(message, subscribers[1..]);
+ subscriber.OnNext(message, next);
+ return;
+ }
+ case ISubscriber final:
+ {
+ final.OnNext(message);
+ return;
+ }
+ }
+ }
+
+ public static void Publish(TMessage message, ISubscriber subscriber)
+ => Publish(message, [subscriber]);
+}
\ No newline at end of file
diff --git a/src/Contracts/Subscribers/DataSubscriber.cs b/src/Contracts/Subscribers/DataSubscriber.cs
new file mode 100644
index 0000000..eaa89f6
--- /dev/null
+++ b/src/Contracts/Subscribers/DataSubscriber.cs
@@ -0,0 +1,20 @@
+using System.Collections.Immutable;
+
+namespace ProSol.Messaging;
+
+public sealed class DataSubscriber : ISubscriber
+{
+ private readonly List messages = [];
+
+ public ImmutableArray Messages => [..messages];
+
+ public void OnCompleted()
+ {
+ messages.Clear();
+ }
+
+ public void OnNext(TMessage message)
+ {
+ messages.Add(message);
+ }
+}
\ No newline at end of file
diff --git a/src/ISubscriber.cs b/src/Contracts/Subscribers/IPipelineSubscriber.cs
similarity index 77%
rename from src/ISubscriber.cs
rename to src/Contracts/Subscribers/IPipelineSubscriber.cs
index 1193220..f8c4c3a 100644
--- a/src/ISubscriber.cs
+++ b/src/Contracts/Subscribers/IPipelineSubscriber.cs
@@ -3,9 +3,9 @@ namespace ProSol.Messaging;
public delegate void NextDelegate();
///
-/// An Observer/Subscriber object.
+/// An Observer/Subscriber object with pipeline support.
///
-public interface ISubscriber
+public interface IPipelineSubscriber : ISubscriber
{
///
/// Represents a reaction on a message.
@@ -16,9 +16,4 @@ public interface ISubscriber
/// - For more details about ASP.NET pipeline, see https://learn.microsoft.com/en-us/aspnet/core/fundamentals/middleware/?view=aspnetcore-8.0
///
void OnNext(TMessage message, NextDelegate next);
-
- ///
- /// Finalizes a subscriber.
- ///
- void OnCompleted();
-}
+}
\ No newline at end of file
diff --git a/src/Contracts/Subscribers/ISubscriber.cs b/src/Contracts/Subscribers/ISubscriber.cs
new file mode 100644
index 0000000..e8c703a
--- /dev/null
+++ b/src/Contracts/Subscribers/ISubscriber.cs
@@ -0,0 +1,10 @@
+namespace ProSol.Messaging;
+
+public interface ISubscriber
+{
+ ///
+ /// Finalizes a subscriber.
+ ///
+ void OnCompleted();
+}
+
diff --git a/src/Contracts/Subscribers/ISubscriber_T.cs b/src/Contracts/Subscribers/ISubscriber_T.cs
new file mode 100644
index 0000000..04a9995
--- /dev/null
+++ b/src/Contracts/Subscribers/ISubscriber_T.cs
@@ -0,0 +1,7 @@
+namespace ProSol.Messaging;
+
+public interface ISubscriber : ISubscriber
+{
+ void OnNext(TMessage message);
+}
+
diff --git a/src/PipelineMessagePublisher.cs b/src/PipelineMessagePublisher.cs
deleted file mode 100644
index d6f89c0..0000000
--- a/src/PipelineMessagePublisher.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-using ProSol.Messaging.Translating;
-
-namespace ProSol.Messaging;
-
-///
-/// Collects a subscribers and publishes a messages to them.
-/// Provides a pipeline mechanics, so a subscriber is able to stop message processing.
-///
-/// Type of message.
-public class PipelineMessagePublisher :
- IPublisher,
- ITranslatorPublisher
-{
- private readonly HashSet> subscribers = [];
-
- ///
- /// Subscribes a subscriber.
- ///
- /// An implementation of or interfaces.
- /// Unsubscriber.
- public IDisposable Subscribe(ISubscriber subscriber)
- {
- subscribers.Add(subscriber);
- return new Unsubscriber(subscribers, subscriber);
- }
-
- ///
- /// Subscribes a subscriber in different message format.
- ///
- public IDisposable Subscribe(TranslatorBase translator)
- {
- return Subscribe((ISubscriber)translator);
- }
-
- ///
- /// Pushes the message to subscribers.
- ///
- /// A message to push to specific .
- public void Publish(TMessage message)
- {
- PublishInternal(subscribers, message);
- }
-
- ///
- /// Pushes the `complete` message, so the subscribers can finalize the subscription.
- ///
- public void Complete()
- {
- foreach (var item in subscribers)
- {
- item.OnCompleted();
- }
-
- subscribers.Clear();
- }
-
- private static void PublishInternal(IEnumerable> subscribers, TMessage message)
- {
- if (!subscribers.Any())
- {
- return;
- }
-
- void next() => PublishInternal(subscribers.Skip(1), message);
- subscribers.First().OnNext(message, next);
- }
-}
diff --git a/src/Publishers/PipelineMessagePublisher.cs b/src/Publishers/PipelineMessagePublisher.cs
new file mode 100644
index 0000000..19fc93b
--- /dev/null
+++ b/src/Publishers/PipelineMessagePublisher.cs
@@ -0,0 +1,43 @@
+namespace ProSol.Messaging;
+
+///
+/// Collects a subscribers and publishes a messages to them.
+/// Provides a pipeline mechanics, so a subscriber is able to stop message processing.
+///
+/// Type of message.
+public class PipelineMessagePublisher :
+ IPublisher
+{
+ private readonly HashSet subscribers = [];
+
+ ///
+ /// Subscribes a subscriber.
+ ///
+ /// An implementation of or interfaces.
+ /// Unsubscriber.
+ public IDisposable Subscribe(ISubscriber subscriber)
+ {
+ subscribers.Add(subscriber);
+ return new Unsubscriber(subscribers, subscriber);
+ }
+
+ ///
+ /// Pushes the message to subscribers.
+ ///
+ /// A message to push to specific .
+ public void Publish(TMessage message)
+ => PublishHelper.Publish(message, [.. subscribers]);
+
+ ///
+ /// Pushes the `complete` message, so the subscribers can finalize the subscription.
+ ///
+ public void Complete()
+ {
+ foreach (var item in subscribers)
+ {
+ item.OnCompleted();
+ }
+
+ subscribers.Clear();
+ }
+}
\ No newline at end of file
diff --git a/src/Messaging.csproj b/src/Publishers/Publishers.csproj
similarity index 51%
rename from src/Messaging.csproj
rename to src/Publishers/Publishers.csproj
index 7f217f1..6b116be 100644
--- a/src/Messaging.csproj
+++ b/src/Publishers/Publishers.csproj
@@ -1,15 +1,14 @@
+
+
+
+
+
+
net8.0
enable
enable
- preview
-
-
-
- <_Parameter1>$(AssemblyName).Tests
-
-
diff --git a/src/Publishers/Unsubscriber.cs b/src/Publishers/Unsubscriber.cs
new file mode 100644
index 0000000..bd31475
--- /dev/null
+++ b/src/Publishers/Unsubscriber.cs
@@ -0,0 +1,20 @@
+namespace ProSol.Messaging;
+
+public sealed class Unsubscriber : IDisposable
+{
+ private readonly ICollection subscribers;
+ private readonly ISubscriber target;
+
+ internal Unsubscriber(
+ ICollection subscribers,
+ ISubscriber target)
+ {
+ this.subscribers = subscribers;
+ this.target = target;
+ }
+
+ public void Dispose()
+ {
+ subscribers.Remove(target);
+ }
+}
\ No newline at end of file
diff --git a/src/Subscriptions/AllFilteredSubscription.cs b/src/Subscriptions/AllFilteredSubscription.cs
new file mode 100644
index 0000000..baf1ffd
--- /dev/null
+++ b/src/Subscriptions/AllFilteredSubscription.cs
@@ -0,0 +1,8 @@
+namespace ProSol.Messaging.Subscriptions;
+
+public sealed class AllFilteredSubscription(IPipelineSubscriber subscriber, params Predicate[] predicates)
+ : FilteredSubscriptionBase(subscriber, predicates)
+{
+ protected override bool CheckPredicates(IEnumerable> predicates, TMessage message)
+ => predicates.All(x => x(message));
+}
\ No newline at end of file
diff --git a/src/Subscriptions/AnyFilteredSubscription.cs b/src/Subscriptions/AnyFilteredSubscription.cs
new file mode 100644
index 0000000..3ef0abe
--- /dev/null
+++ b/src/Subscriptions/AnyFilteredSubscription.cs
@@ -0,0 +1,8 @@
+namespace ProSol.Messaging.Subscriptions;
+
+public sealed class AnyFilteredSubscription(IPipelineSubscriber subscriber, params Predicate[] predicates)
+ : FilteredSubscriptionBase(subscriber, predicates)
+{
+ protected override bool CheckPredicates(IEnumerable> predicates, TMessage message)
+ => predicates.Any(x => x(message));
+}
\ No newline at end of file
diff --git a/src/Subscriptions/FilteredSubscriptionBase.cs b/src/Subscriptions/FilteredSubscriptionBase.cs
new file mode 100644
index 0000000..ee5545d
--- /dev/null
+++ b/src/Subscriptions/FilteredSubscriptionBase.cs
@@ -0,0 +1,31 @@
+namespace ProSol.Messaging.Subscriptions;
+
+///
+/// Represents a subscription on messages with specific field states.
+///
+/// A type of message.
+/// A listener of a message.
+/// A set of predicates which should be complied in order to process the message.
+///
+/// - If predicates criteria not met - the NextDelegate is called anyway.
+///
+public abstract class FilteredSubscriptionBase(IPipelineSubscriber subscriber, params Predicate[] predicates)
+ : SubscriptionBase(subscriber)
+{
+ private readonly Predicate[] predicates = predicates;
+
+ protected sealed override void OnNext(IPipelineSubscriber subscriber, TMessage message, NextDelegate next)
+ {
+ if (CheckPredicates(predicates, message))
+ {
+ subscriber.OnNext(message, next);
+ }
+ else
+ {
+ // NOTE: filter mismatch should not break a pipeline.
+ next();
+ }
+ }
+
+ protected abstract bool CheckPredicates(IEnumerable> predicates, TMessage message);
+}
\ No newline at end of file
diff --git a/src/Subscriptions/RegularSubscription.cs b/src/Subscriptions/RegularSubscription.cs
new file mode 100644
index 0000000..f28481d
--- /dev/null
+++ b/src/Subscriptions/RegularSubscription.cs
@@ -0,0 +1,11 @@
+namespace ProSol.Messaging.Subscriptions;
+
+///
+/// Represents a dummy wrapper around a .
+///
+public class RegularSubscription(IPipelineSubscriber subscriber)
+ : SubscriptionBase(subscriber)
+{
+ protected override void OnNext(IPipelineSubscriber subscriber, TMessage message, NextDelegate next)
+ => subscriber.OnNext(message, next);
+}
\ No newline at end of file
diff --git a/src/Subscriptions/SubscriptionBase.cs b/src/Subscriptions/SubscriptionBase.cs
new file mode 100644
index 0000000..b0d98d8
--- /dev/null
+++ b/src/Subscriptions/SubscriptionBase.cs
@@ -0,0 +1,25 @@
+namespace ProSol.Messaging.Subscriptions;
+
+///
+/// Represents a wrapper around ,
+/// which allows to make extra control of message execution
+/// in child classes.
+///
+/// A type of message.
+/// A listener of a message.
+public abstract class SubscriptionBase(IPipelineSubscriber subscriber)
+ : IPipelineSubscriber
+{
+ private readonly IPipelineSubscriber subscriber = subscriber;
+
+ public void OnCompleted()
+ => OnCompleted(subscriber);
+
+ public void OnNext(TMessage message, NextDelegate next)
+ => OnNext(subscriber, message, next);
+
+ protected virtual void OnCompleted(IPipelineSubscriber subscriber)
+ => subscriber.OnCompleted();
+
+ protected abstract void OnNext(IPipelineSubscriber subscriber, TMessage message, NextDelegate next);
+}
diff --git a/src/Subscriptions/Subscriptions.csproj b/src/Subscriptions/Subscriptions.csproj
new file mode 100644
index 0000000..13e873c
--- /dev/null
+++ b/src/Subscriptions/Subscriptions.csproj
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+ net8.0
+ enable
+ enable
+
+
diff --git a/src/Translating/ITranslatorPublisher.cs b/src/Translating/ITranslatorPublisher.cs
deleted file mode 100644
index 7bcf3ca..0000000
--- a/src/Translating/ITranslatorPublisher.cs
+++ /dev/null
@@ -1,6 +0,0 @@
-namespace ProSol.Messaging.Translating;
-
-public interface ITranslatorPublisher
-{
- IDisposable Subscribe(TranslatorBase translatorBase);
-}
\ No newline at end of file
diff --git a/src/Translating/PublisherFluentExtensions.cs b/src/Translating/PublisherFluentExtensions.cs
new file mode 100644
index 0000000..42b86a4
--- /dev/null
+++ b/src/Translating/PublisherFluentExtensions.cs
@@ -0,0 +1,13 @@
+namespace ProSol.Messaging.Translating;
+
+public static class IPublisherFluentExtensions
+{
+ public static IPublisher Translate(
+ this IPublisher publisher,
+ Func converter)
+ {
+ var result = new SimpleTranslator(converter);
+ var unsubscriber = publisher.Subscribe(result);
+ return result;
+ }
+}
diff --git a/src/Translating/SimpleTranslator.cs b/src/Translating/SimpleTranslator.cs
index 767db17..287ff79 100644
--- a/src/Translating/SimpleTranslator.cs
+++ b/src/Translating/SimpleTranslator.cs
@@ -1,7 +1,7 @@
namespace ProSol.Messaging.Translating;
-internal sealed class SimpleTranslator(ISubscriber destSubscriber, Func converter)
- : TranslatorBase(destSubscriber)
+public sealed class SimpleTranslator(Func converter)
+ : TranslatorBase
{
protected override TDest ConvertMessage(TSource message)
=> converter(message);
diff --git a/src/Translating/Translating.csproj b/src/Translating/Translating.csproj
new file mode 100644
index 0000000..13e873c
--- /dev/null
+++ b/src/Translating/Translating.csproj
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+ net8.0
+ enable
+ enable
+
+
diff --git a/src/Translating/TranslatorBase.cs b/src/Translating/TranslatorBase.cs
index ca2521b..6361ce8 100644
--- a/src/Translating/TranslatorBase.cs
+++ b/src/Translating/TranslatorBase.cs
@@ -1,19 +1,20 @@
namespace ProSol.Messaging.Translating;
-public abstract class TranslatorBase(ISubscriber destSubscriber) : ISubscriber
+public abstract class TranslatorBase
+ : IPipelineSubscriber, IPublisher, IDisposable
{
- private readonly ISubscriber destSubscriber = destSubscriber;
+ private ISubscriber? destSubscriber;
///
- public void OnCompleted() => destSubscriber.OnCompleted();
+ public void OnCompleted() => destSubscriber?.OnCompleted();
///
public void OnNext(TSource message, NextDelegate next)
{
var dest = ConvertMessage(message);
- if (dest != null)
+ if (dest != null && destSubscriber != null)
{
- destSubscriber.OnNext(dest, next);
+ PublishHelper.Publish(dest, destSubscriber);
}
else
{
@@ -31,6 +32,19 @@ public abstract class TranslatorBase(ISubscriber destSubs
///
protected abstract TDest? ConvertMessage(TSource message);
- public static TranslatorBase Create(ISubscriber destSubscriber, Func converter)
- => new SimpleTranslator(destSubscriber, converter);
+ public IDisposable Subscribe(ISubscriber subscriber)
+ {
+ if (destSubscriber != null)
+ {
+ throw new InvalidOperationException("Unable to subscribe more than one subscriber.");
+ }
+ destSubscriber = subscriber;
+ return this;
+ }
+
+ public void Dispose()
+ {
+ destSubscriber?.OnCompleted();
+ destSubscriber = null;
+ }
}
diff --git a/src/Unsubscriber.cs b/src/Unsubscriber.cs
deleted file mode 100644
index 7d59333..0000000
--- a/src/Unsubscriber.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-namespace ProSol.Messaging;
-
-public sealed class Unsubscriber : IDisposable
-{
- private readonly ICollection> subscribers;
- private readonly ISubscriber target;
-
- internal Unsubscriber(
- ICollection> subscribers,
- ISubscriber target)
- {
- this.subscribers = subscribers;
- this.target = target;
- }
-
- public void Dispose()
- {
- subscribers.Remove(target);
- }
-}
\ No newline at end of file
diff --git a/tests/MessageTranslatorTests.cs b/tests/MessageTranslatorTests.cs
deleted file mode 100644
index a50fc1b..0000000
--- a/tests/MessageTranslatorTests.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-using FakeItEasy;
-using ProSol.Messaging;
-using ProSol.Messaging.Translating;
-
-namespace Messaging.Tests;
-
-public class MessageTranslatorTests
-{
- [Test]
- public void SimpleCase_OneToOne_ShouldWork()
- {
- // Arrange configs.
- string[] input = ["Hello", "World!"];
- int[] expected = [5, 6];
- static int converter(string message) => message.Length;
- List messages = [];
-
- // Arrange operations.
- var fakeListener = A.Fake>();
- A.CallTo(() => fakeListener.OnNext(A._, A._))
- .Invokes((int i, NextDelegate next) => messages.Add(i));
-
- var publisher = new PipelineMessagePublisher();
- publisher.Subscribe(TranslatorBase.Create(fakeListener, converter));
-
- // Act.
- foreach(var item in input)
- {
- publisher.Publish(item);
- }
-
- // Assert.
- Assert.That(messages, Is.EquivalentTo(expected));
- }
-}
\ No newline at end of file
diff --git a/tests/Subscriptions/AllFilteredSubscriptionTests.cs b/tests/Subscriptions/AllFilteredSubscriptionTests.cs
new file mode 100644
index 0000000..52d3aea
--- /dev/null
+++ b/tests/Subscriptions/AllFilteredSubscriptionTests.cs
@@ -0,0 +1,41 @@
+using ProSol.Messaging.Subscriptions;
+
+namespace ProSol.Messaging.Tests.Subscriptions;
+
+public class AllFilteredSubscriptionTests
+{
+ private class IntSubscriber : IPipelineSubscriber
+ {
+ public List Numbers = [];
+ public void OnCompleted() { }
+
+ public void OnNext(int message, NextDelegate next)
+ {
+ Numbers.Add(message);
+ next();
+ }
+ }
+
+ [Test]
+ public void IntSubscriber_ShouldReceive_OnlyLimitedRange()
+ {
+ Predicate[] predicates = [
+ x => x >= 0,
+ x => x < 3
+ ];
+ int[] source = [-1, 0, 1, 2, 3, 4];
+ int[] expected = [0, 1, 2];
+
+ var subscriber = new IntSubscriber();
+ var subscription = new AllFilteredSubscription(
+ subscriber,
+ predicates);
+
+ foreach (var item in source)
+ {
+ subscription.OnNext(item, () => {});
+ }
+
+ Assert.That(subscriber.Numbers.ToArray(), Is.EquivalentTo(expected));
+ }
+}
\ No newline at end of file
diff --git a/tests/Subscriptions/AnyFilteredSubscriptionTests.cs b/tests/Subscriptions/AnyFilteredSubscriptionTests.cs
new file mode 100644
index 0000000..66b815d
--- /dev/null
+++ b/tests/Subscriptions/AnyFilteredSubscriptionTests.cs
@@ -0,0 +1,51 @@
+using ProSol.Messaging.Subscriptions;
+
+namespace ProSol.Messaging.Tests.Subscriptions;
+
+public class AnyFilteredSubscriptionTests
+{
+ private class StringSubscriber : IPipelineSubscriber
+ {
+ public List Strings = [];
+ public void OnCompleted() { }
+
+ public void OnNext(string message, NextDelegate next)
+ {
+ Strings.Add(message);
+ next();
+ }
+ }
+
+ [Test]
+ public void Pipeline_ShouldStop_OnAny_Keyword()
+ {
+ Predicate[] predicates = [
+ x => x.Contains("ERROR"),
+ x => x.Contains("CRITICAL")
+ ];
+
+ string[] source = [
+ "quick brown fox",
+ "introduced an ERROR",
+ "so the lazy dog",
+ "got a CRITICAL message"
+ ];
+
+ string[] expected = [
+ "introduced an ERROR",
+ "got a CRITICAL message"
+ ];
+
+ var subscriber = new StringSubscriber();
+ var subscription = new AnyFilteredSubscription(
+ subscriber,
+ predicates);
+
+ foreach (var item in source)
+ {
+ subscription.OnNext(item, () => {});
+ }
+
+ Assert.That(subscriber.Strings.ToArray(), Is.EquivalentTo(expected));
+ }
+}
\ No newline at end of file
diff --git a/tests/GlobalUsings.cs b/tests/Subscriptions/GlobalUsings.cs
similarity index 100%
rename from tests/GlobalUsings.cs
rename to tests/Subscriptions/GlobalUsings.cs
diff --git a/tests/Messaging.Tests.csproj b/tests/Subscriptions/Subscriptions.Tests.csproj
similarity index 82%
rename from tests/Messaging.Tests.csproj
rename to tests/Subscriptions/Subscriptions.Tests.csproj
index cbda234..991c21e 100644
--- a/tests/Messaging.Tests.csproj
+++ b/tests/Subscriptions/Subscriptions.Tests.csproj
@@ -10,7 +10,6 @@
-
@@ -19,7 +18,7 @@
-
+
diff --git a/tests/Translating/GlobalUsings.cs b/tests/Translating/GlobalUsings.cs
new file mode 100644
index 0000000..cefced4
--- /dev/null
+++ b/tests/Translating/GlobalUsings.cs
@@ -0,0 +1 @@
+global using NUnit.Framework;
\ No newline at end of file
diff --git a/tests/Translating/MessageTranslatorTests.cs b/tests/Translating/MessageTranslatorTests.cs
new file mode 100644
index 0000000..ea787e0
--- /dev/null
+++ b/tests/Translating/MessageTranslatorTests.cs
@@ -0,0 +1,51 @@
+using ProSol.Messaging;
+using ProSol.Messaging.Translating;
+
+namespace Messaging.Tests;
+
+public class MessageTranslatorTests
+{
+ [Test]
+ public void Translating_MultipleChains_ShouldWork()
+ {
+ // Arrange configs.
+ string[] input = ["Hello", "World!"];
+ bool[] expected = [false, true];
+
+ // Arrange operations.
+ var dataSubscriber = new DataSubscriber();
+
+ var publisher = new TestPublisher();
+ using var unsub = publisher
+ .Translate(x => x.Length)
+ .Translate(x => x > 5)
+ .Subscribe(dataSubscriber);
+
+ // Act.
+ foreach(var item in input)
+ {
+ publisher.Publish(item);
+ }
+
+ // Assert.
+ Assert.That(dataSubscriber.Messages, Is.EquivalentTo(expected));
+ }
+
+ public class TestPublisher : IPublisher
+ {
+ private ISubscriber? subscriber;
+
+ public void Publish(TData message)
+ {
+ PublishHelper.Publish(message, subscriber!);
+ }
+
+ public IDisposable Subscribe(ISubscriber subscriber)
+ {
+ this.subscriber = subscriber;
+ #pragma warning disable CS8603
+ return null;
+ #pragma warning restore CS8603
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Translating/Translating.Tests.csproj b/tests/Translating/Translating.Tests.csproj
new file mode 100644
index 0000000..8704e38
--- /dev/null
+++ b/tests/Translating/Translating.Tests.csproj
@@ -0,0 +1,24 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+ false
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+