diff --git a/Directory.Build.targets b/Directory.Build.targets new file mode 100644 index 0000000..77dcd79 --- /dev/null +++ b/Directory.Build.targets @@ -0,0 +1,22 @@ + + + + + + + + + + + <_Parameter1>%(InternalsVisibleTo.Identity) + + + + + + + <_Parameter1>$(AssemblyName)%(InternalsVisibleToSuffix.Identity) + + + + \ No newline at end of file diff --git a/Messaging.code-workspace b/Messaging.code-workspace new file mode 100644 index 0000000..ffef029 --- /dev/null +++ b/Messaging.code-workspace @@ -0,0 +1,57 @@ +// Please use this file for new Modules. +// Remove .txt and put in root of module. +// And remove this comment. +{ + "folders": [ + { + "path": "." + } + ], + "tasks": { + "version": "2.0.0", + "presentation":{ + "reveal": "silent" + }, + "tasks": [ + { + "label": "dotnet: build", + "type": "shell", + "command": "dotnet", + "args": [ + "build", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "problemMatcher": [ + "$msCompile" + ], + "group": { + "kind": "build", + "isDefault": true + }, + }, + { + "label": "dotnet: test", + "type": "shell", + "command": "dotnet", + "args": [ + "test" + ], + "problemMatcher": [ + "$msCompile" + ], + "group": { + "kind": "test", + "isDefault": true + }, + } + ] + }, + "extensions": { + "recommendations": [ + "ms-dotnettools.vscode-dotnet-runtime", + "ms-dotnettools.csharp", + "ms-dotnettools.csdevkit", + ] + } +} \ No newline at end of file diff --git a/Messaging.sln b/Messaging.sln new file mode 100644 index 0000000..98ea707 --- /dev/null +++ b/Messaging.sln @@ -0,0 +1,64 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.31903.59 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{C9061001-B6D0-49F8-AA95-5D421E96DDA2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Contracts", "src\Contracts\Contracts.csproj", "{FF4D591C-67FE-4E8E-AD73-D13465D6DB44}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Publishers", "src\Publishers\Publishers.csproj", "{4856538B-EE02-4568-B133-4612B01601B0}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating", "src\Translating\Translating.csproj", "{5549AC68-4155-4E42-B88C-672A8D39E420}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions", "src\Subscriptions\Subscriptions.csproj", "{08D3A876-4D42-49AC-AD6D-E597F9B89822}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{A6E6BB70-923B-4A64-A5E6-8AFE18B535AA}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Translating.Tests", "tests\Translating\Translating.Tests.csproj", "{E33F062A-2120-4AF0-8688-330A8CA7F0B0}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriptions.Tests", "tests\Subscriptions\Subscriptions.Tests.csproj", "{90879E74-5F24-4ED6-9988-6FFFFE38B96A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FF4D591C-67FE-4E8E-AD73-D13465D6DB44}.Release|Any CPU.Build.0 = Release|Any CPU + {4856538B-EE02-4568-B133-4612B01601B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4856538B-EE02-4568-B133-4612B01601B0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4856538B-EE02-4568-B133-4612B01601B0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4856538B-EE02-4568-B133-4612B01601B0}.Release|Any CPU.Build.0 = Release|Any CPU + {5549AC68-4155-4E42-B88C-672A8D39E420}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5549AC68-4155-4E42-B88C-672A8D39E420}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5549AC68-4155-4E42-B88C-672A8D39E420}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5549AC68-4155-4E42-B88C-672A8D39E420}.Release|Any CPU.Build.0 = Release|Any CPU + {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Debug|Any CPU.Build.0 = Debug|Any CPU + {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Release|Any CPU.ActiveCfg = Release|Any CPU + {08D3A876-4D42-49AC-AD6D-E597F9B89822}.Release|Any CPU.Build.0 = Release|Any CPU + {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E33F062A-2120-4AF0-8688-330A8CA7F0B0}.Release|Any CPU.Build.0 = Release|Any CPU + {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {90879E74-5F24-4ED6-9988-6FFFFE38B96A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {FF4D591C-67FE-4E8E-AD73-D13465D6DB44} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2} + {4856538B-EE02-4568-B133-4612B01601B0} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2} + {5549AC68-4155-4E42-B88C-672A8D39E420} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2} + {08D3A876-4D42-49AC-AD6D-E597F9B89822} = {C9061001-B6D0-49F8-AA95-5D421E96DDA2} + {E33F062A-2120-4AF0-8688-330A8CA7F0B0} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA} + {90879E74-5F24-4ED6-9988-6FFFFE38B96A} = {A6E6BB70-923B-4A64-A5E6-8AFE18B535AA} + EndGlobalSection +EndGlobal diff --git a/README.md b/README.md index 45e8a17..8f5fd0e 100644 --- a/README.md +++ b/README.md @@ -1,77 +1,9 @@ # ProSol.Messaging Implements a message broker with ability to build a pipeline of listeners. +The project consists of: +- observer-related interfaces, +- advanced subscriptions, +- pipeline-based publisher +- translators for a messages. -## API - -The project contains an API for the `Observer pattern`: `IPublisher`, `ISubscriber`, and `PipelinePublisher`. - -- The `IPublisher` interface is aimed to provide a layer of absctraction to the client. -- The `ISubscriber` interface decalres a subscription contract with chained calls support via `NextDelegate`. - - `NextDelegate`: is a **mandatory** delegate to trigger if current subscriber wants to bypass a message to next subscriber. -- `PipelinePublisher`: a ready-to-use implementation for chained subscriptions. - -## Demo - -Let's see how works in a console app, -by making a publisher for strings, -which addresses the message to the StringListener, -but there is a StringLimitListener between them. - -So, the StringLimitListener will drop the message longer than "Please" string: - -```csharp -using ProSol.Messaging; - -var maxLength = "Please".Length + 1; - -var broker = new MessageBroker(); -// Set up a chain. -broker.Subscribe(new StringLimitListener(maxLength)); -broker.Subscribe(new StringListener()); - -// Push some messages. -broker.Publish("Please"); -broker.Publish("STOP"); -broker.Publish("The planet"); -// OUTPUT: -// Please -// STOP - -// Clean up. -broker.Complete(); -``` - -```csharp -public class StringListener : ISubscriber -{ - public void OnCompleted() { } - - public void OnNext(string message, NextDelegate next) - { - Console.WriteLine(message); - - if (message != "STOP") - { - next(); - } - } -} -``` - -```csharp -public class StringLimitListener(int limit) : ISubscriber -{ - private readonly int limit = limit; - - public void OnCompleted() { } - - public void OnNext(string message, NextDelegate next) - { - if (message.Length < limit) - { - next(); - } - } -} -``` \ No newline at end of file diff --git a/docs/Subscriptions.md b/docs/Subscriptions.md new file mode 100644 index 0000000..f49c481 --- /dev/null +++ b/docs/Subscriptions.md @@ -0,0 +1,83 @@ +# ProSol.Messaging.Subscriptions + +A set of wrappers for a Subscriber, which represent the connection between Publisher and Subscriber - the Subsription. +The Subscription is a smart trigger for a Subscriber, providing the conditions for triggering. + +## FilteredSubscription + +The `FilteredSubscription`, triggers the Subscriber when conditions are met: `All` or `Any`. + +## Demo + +Lets create a notification when an error is logged. + +Add the package: +``` +dotnet add package ProSol.Messaging.Subscriptions +``` + +Include the package: +```csharp +using ProSol.Messaging; +using ProSol.Messaging.Subscriptions; +``` + +Now, lets build a system, which reacts on errors: +```csharp +// Declare the predicates. +Predicate[] predicates = [ + x => x.Contains("ERROR"), + x => x.Contains("CRITICAL") +]; + +// Wire up a dummy string subscriber. +var subscriber = new StringSubscriber(message => Console.WriteLine(message)); + +// Create an Any-subscription. +var subscription = new FilteredSubscription + .Any( + subscriber, + predicates); + +// Create the publisher and subscription. +var publisher = new PipelinePublisher(); +publisher.Subscribe(subscription); +``` + +Now, lets simulate some logs: + +```csharp +string[] source = [ + "quick brown fox", + "introduced an ERROR", + "so the lazy dog", + "got a CRITICAL message" +]; + +foreach (var item in source) +{ + publisher.Publish(item); +} + +// OUTPUT: +// introduced an ERROR +// got a CRITICAL message +``` + +```csharp +public class StringSubscriber(Action action) : ISubscriber +{ + public void OnCompleted() { } + + public void OnNext(string message, NextDelegate next) + { + action(message); + next(); + } +} +``` + +That's it! Just wrap a `FilteredSubscription` around a `Subscriber`, subscribe it on `Publisher` and run `publisher.Publish`. +Please refer to [tests folder](/tests/) for more cases. + +Happy coding! \ No newline at end of file diff --git a/pack/ProSol.Messaging.csproj b/pack/ProSol.Messaging.csproj new file mode 100644 index 0000000..a1a5fb6 --- /dev/null +++ b/pack/ProSol.Messaging.csproj @@ -0,0 +1,15 @@ + + + + + + + + + + + net8.0 + enable + enable + + diff --git a/pack/nuget-pack.sh b/pack/nuget-pack.sh new file mode 100755 index 0000000..714d39e --- /dev/null +++ b/pack/nuget-pack.sh @@ -0,0 +1,2 @@ +rm -rf .nuget +dotnet pack -c Release -p:NuspecFile=package.nuspec --include-source -o=.nuget diff --git a/src/package.nuspec b/pack/package.nuspec similarity index 83% rename from src/package.nuspec rename to pack/package.nuspec index 3af84e8..286bfad 100644 --- a/src/package.nuspec +++ b/pack/package.nuspec @@ -3,7 +3,7 @@ ProSol.Messaging ProSol.Messaging - 4.0.0-rc.2 + 4.0.0-rc.5.0 Alex Kozachenko Alex Kozachenko https://git.disroot.org/alexenko/ProSol.Messaging @@ -20,7 +20,8 @@ tool observer observer-pattern design-patterns - + + - \ No newline at end of file + diff --git a/src.sln b/src.sln deleted file mode 100644 index 258c4bd..0000000 --- a/src.sln +++ /dev/null @@ -1,28 +0,0 @@ - -Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 17 -VisualStudioVersion = 17.0.31903.59 -MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProSol.Messaging", "src\Messaging.csproj", "{89203112-653F-4A81-A3B9-9B711E5F6A6B}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messaging.Tests", "tests\Messaging.Tests.csproj", "{9D9B8827-380D-4AE5-8BA5-F4056935C13A}" -EndProject -Global - GlobalSection(SolutionConfigurationPlatforms) = preSolution - Debug|Any CPU = Debug|Any CPU - Release|Any CPU = Release|Any CPU - EndGlobalSection - GlobalSection(SolutionProperties) = preSolution - HideSolutionNode = FALSE - EndGlobalSection - GlobalSection(ProjectConfigurationPlatforms) = postSolution - {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Debug|Any CPU.Build.0 = Debug|Any CPU - {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Release|Any CPU.ActiveCfg = Release|Any CPU - {89203112-653F-4A81-A3B9-9B711E5F6A6B}.Release|Any CPU.Build.0 = Release|Any CPU - {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Debug|Any CPU.Build.0 = Debug|Any CPU - {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Release|Any CPU.ActiveCfg = Release|Any CPU - {9D9B8827-380D-4AE5-8BA5-F4056935C13A}.Release|Any CPU.Build.0 = Release|Any CPU - EndGlobalSection -EndGlobal diff --git a/src/Contracts/Contracts.csproj b/src/Contracts/Contracts.csproj new file mode 100644 index 0000000..b007a4d --- /dev/null +++ b/src/Contracts/Contracts.csproj @@ -0,0 +1,8 @@ + + + + net8.0 + enable + enable + + diff --git a/src/IPublisher.cs b/src/Contracts/Publishers/IPublisher.cs similarity index 73% rename from src/IPublisher.cs rename to src/Contracts/Publishers/IPublisher.cs index eaf2e45..f501961 100644 --- a/src/IPublisher.cs +++ b/src/Contracts/Publishers/IPublisher.cs @@ -3,12 +3,12 @@ namespace ProSol.Messaging; /// /// A Publisher/Observable object. /// -public interface IPublisher +public interface IPublisher { /// /// Subscribe a subscriber. /// /// Target. /// Custom IDisposable implementation. - IDisposable Subscribe(ISubscriber subscriber); + IDisposable Subscribe(ISubscriber subscriber); } diff --git a/src/Contracts/Publishers/IPublisher_T.cs b/src/Contracts/Publishers/IPublisher_T.cs new file mode 100644 index 0000000..ccff95a --- /dev/null +++ b/src/Contracts/Publishers/IPublisher_T.cs @@ -0,0 +1,6 @@ +namespace ProSol.Messaging; + +public interface IPublisher : IPublisher +{ + void Publish(TMessage message); +} diff --git a/src/Contracts/Publishers/PublishHelper.cs b/src/Contracts/Publishers/PublishHelper.cs new file mode 100644 index 0000000..e5f55e6 --- /dev/null +++ b/src/Contracts/Publishers/PublishHelper.cs @@ -0,0 +1,26 @@ +namespace ProSol.Messaging; + +public static class PublishHelper +{ + public static void Publish(TMessage message, params ISubscriber[] subscribers) + { + switch (subscribers.FirstOrDefault()) + { + case null: return; + case IPipelineSubscriber subscriber: + { + void next() => Publish(message, subscribers[1..]); + subscriber.OnNext(message, next); + return; + } + case ISubscriber final: + { + final.OnNext(message); + return; + } + } + } + + public static void Publish(TMessage message, ISubscriber subscriber) + => Publish(message, [subscriber]); +} \ No newline at end of file diff --git a/src/Contracts/Subscribers/DataSubscriber.cs b/src/Contracts/Subscribers/DataSubscriber.cs new file mode 100644 index 0000000..eaa89f6 --- /dev/null +++ b/src/Contracts/Subscribers/DataSubscriber.cs @@ -0,0 +1,20 @@ +using System.Collections.Immutable; + +namespace ProSol.Messaging; + +public sealed class DataSubscriber : ISubscriber +{ + private readonly List messages = []; + + public ImmutableArray Messages => [..messages]; + + public void OnCompleted() + { + messages.Clear(); + } + + public void OnNext(TMessage message) + { + messages.Add(message); + } +} \ No newline at end of file diff --git a/src/ISubscriber.cs b/src/Contracts/Subscribers/IPipelineSubscriber.cs similarity index 77% rename from src/ISubscriber.cs rename to src/Contracts/Subscribers/IPipelineSubscriber.cs index 1193220..f8c4c3a 100644 --- a/src/ISubscriber.cs +++ b/src/Contracts/Subscribers/IPipelineSubscriber.cs @@ -3,9 +3,9 @@ namespace ProSol.Messaging; public delegate void NextDelegate(); /// -/// An Observer/Subscriber object. +/// An Observer/Subscriber object with pipeline support. /// -public interface ISubscriber +public interface IPipelineSubscriber : ISubscriber { /// /// Represents a reaction on a message. @@ -16,9 +16,4 @@ public interface ISubscriber /// - For more details about ASP.NET pipeline, see https://learn.microsoft.com/en-us/aspnet/core/fundamentals/middleware/?view=aspnetcore-8.0 /// void OnNext(TMessage message, NextDelegate next); - - /// - /// Finalizes a subscriber. - /// - void OnCompleted(); -} +} \ No newline at end of file diff --git a/src/Contracts/Subscribers/ISubscriber.cs b/src/Contracts/Subscribers/ISubscriber.cs new file mode 100644 index 0000000..e8c703a --- /dev/null +++ b/src/Contracts/Subscribers/ISubscriber.cs @@ -0,0 +1,10 @@ +namespace ProSol.Messaging; + +public interface ISubscriber +{ + /// + /// Finalizes a subscriber. + /// + void OnCompleted(); +} + diff --git a/src/Contracts/Subscribers/ISubscriber_T.cs b/src/Contracts/Subscribers/ISubscriber_T.cs new file mode 100644 index 0000000..04a9995 --- /dev/null +++ b/src/Contracts/Subscribers/ISubscriber_T.cs @@ -0,0 +1,7 @@ +namespace ProSol.Messaging; + +public interface ISubscriber : ISubscriber +{ + void OnNext(TMessage message); +} + diff --git a/src/PipelineMessagePublisher.cs b/src/PipelineMessagePublisher.cs deleted file mode 100644 index d6f89c0..0000000 --- a/src/PipelineMessagePublisher.cs +++ /dev/null @@ -1,67 +0,0 @@ -using ProSol.Messaging.Translating; - -namespace ProSol.Messaging; - -/// -/// Collects a subscribers and publishes a messages to them. -/// Provides a pipeline mechanics, so a subscriber is able to stop message processing. -/// -/// Type of message. -public class PipelineMessagePublisher : - IPublisher, - ITranslatorPublisher -{ - private readonly HashSet> subscribers = []; - - /// - /// Subscribes a subscriber. - /// - /// An implementation of or interfaces. - /// Unsubscriber. - public IDisposable Subscribe(ISubscriber subscriber) - { - subscribers.Add(subscriber); - return new Unsubscriber(subscribers, subscriber); - } - - /// - /// Subscribes a subscriber in different message format. - /// - public IDisposable Subscribe(TranslatorBase translator) - { - return Subscribe((ISubscriber)translator); - } - - /// - /// Pushes the message to subscribers. - /// - /// A message to push to specific . - public void Publish(TMessage message) - { - PublishInternal(subscribers, message); - } - - /// - /// Pushes the `complete` message, so the subscribers can finalize the subscription. - /// - public void Complete() - { - foreach (var item in subscribers) - { - item.OnCompleted(); - } - - subscribers.Clear(); - } - - private static void PublishInternal(IEnumerable> subscribers, TMessage message) - { - if (!subscribers.Any()) - { - return; - } - - void next() => PublishInternal(subscribers.Skip(1), message); - subscribers.First().OnNext(message, next); - } -} diff --git a/src/Publishers/PipelineMessagePublisher.cs b/src/Publishers/PipelineMessagePublisher.cs new file mode 100644 index 0000000..19fc93b --- /dev/null +++ b/src/Publishers/PipelineMessagePublisher.cs @@ -0,0 +1,43 @@ +namespace ProSol.Messaging; + +/// +/// Collects a subscribers and publishes a messages to them. +/// Provides a pipeline mechanics, so a subscriber is able to stop message processing. +/// +/// Type of message. +public class PipelineMessagePublisher : + IPublisher +{ + private readonly HashSet subscribers = []; + + /// + /// Subscribes a subscriber. + /// + /// An implementation of or interfaces. + /// Unsubscriber. + public IDisposable Subscribe(ISubscriber subscriber) + { + subscribers.Add(subscriber); + return new Unsubscriber(subscribers, subscriber); + } + + /// + /// Pushes the message to subscribers. + /// + /// A message to push to specific . + public void Publish(TMessage message) + => PublishHelper.Publish(message, [.. subscribers]); + + /// + /// Pushes the `complete` message, so the subscribers can finalize the subscription. + /// + public void Complete() + { + foreach (var item in subscribers) + { + item.OnCompleted(); + } + + subscribers.Clear(); + } +} \ No newline at end of file diff --git a/src/Messaging.csproj b/src/Publishers/Publishers.csproj similarity index 51% rename from src/Messaging.csproj rename to src/Publishers/Publishers.csproj index 7f217f1..6b116be 100644 --- a/src/Messaging.csproj +++ b/src/Publishers/Publishers.csproj @@ -1,15 +1,14 @@  + + + + + + net8.0 enable enable - preview - - - - <_Parameter1>$(AssemblyName).Tests - - diff --git a/src/Publishers/Unsubscriber.cs b/src/Publishers/Unsubscriber.cs new file mode 100644 index 0000000..bd31475 --- /dev/null +++ b/src/Publishers/Unsubscriber.cs @@ -0,0 +1,20 @@ +namespace ProSol.Messaging; + +public sealed class Unsubscriber : IDisposable +{ + private readonly ICollection subscribers; + private readonly ISubscriber target; + + internal Unsubscriber( + ICollection subscribers, + ISubscriber target) + { + this.subscribers = subscribers; + this.target = target; + } + + public void Dispose() + { + subscribers.Remove(target); + } +} \ No newline at end of file diff --git a/src/Subscriptions/AllFilteredSubscription.cs b/src/Subscriptions/AllFilteredSubscription.cs new file mode 100644 index 0000000..baf1ffd --- /dev/null +++ b/src/Subscriptions/AllFilteredSubscription.cs @@ -0,0 +1,8 @@ +namespace ProSol.Messaging.Subscriptions; + +public sealed class AllFilteredSubscription(IPipelineSubscriber subscriber, params Predicate[] predicates) + : FilteredSubscriptionBase(subscriber, predicates) +{ + protected override bool CheckPredicates(IEnumerable> predicates, TMessage message) + => predicates.All(x => x(message)); +} \ No newline at end of file diff --git a/src/Subscriptions/AnyFilteredSubscription.cs b/src/Subscriptions/AnyFilteredSubscription.cs new file mode 100644 index 0000000..3ef0abe --- /dev/null +++ b/src/Subscriptions/AnyFilteredSubscription.cs @@ -0,0 +1,8 @@ +namespace ProSol.Messaging.Subscriptions; + +public sealed class AnyFilteredSubscription(IPipelineSubscriber subscriber, params Predicate[] predicates) + : FilteredSubscriptionBase(subscriber, predicates) +{ + protected override bool CheckPredicates(IEnumerable> predicates, TMessage message) + => predicates.Any(x => x(message)); +} \ No newline at end of file diff --git a/src/Subscriptions/FilteredSubscriptionBase.cs b/src/Subscriptions/FilteredSubscriptionBase.cs new file mode 100644 index 0000000..ee5545d --- /dev/null +++ b/src/Subscriptions/FilteredSubscriptionBase.cs @@ -0,0 +1,31 @@ +namespace ProSol.Messaging.Subscriptions; + +/// +/// Represents a subscription on messages with specific field states. +/// +/// A type of message. +/// A listener of a message. +/// A set of predicates which should be complied in order to process the message. +/// +/// - If predicates criteria not met - the NextDelegate is called anyway. +/// +public abstract class FilteredSubscriptionBase(IPipelineSubscriber subscriber, params Predicate[] predicates) + : SubscriptionBase(subscriber) +{ + private readonly Predicate[] predicates = predicates; + + protected sealed override void OnNext(IPipelineSubscriber subscriber, TMessage message, NextDelegate next) + { + if (CheckPredicates(predicates, message)) + { + subscriber.OnNext(message, next); + } + else + { + // NOTE: filter mismatch should not break a pipeline. + next(); + } + } + + protected abstract bool CheckPredicates(IEnumerable> predicates, TMessage message); +} \ No newline at end of file diff --git a/src/Subscriptions/RegularSubscription.cs b/src/Subscriptions/RegularSubscription.cs new file mode 100644 index 0000000..f28481d --- /dev/null +++ b/src/Subscriptions/RegularSubscription.cs @@ -0,0 +1,11 @@ +namespace ProSol.Messaging.Subscriptions; + +/// +/// Represents a dummy wrapper around a . +/// +public class RegularSubscription(IPipelineSubscriber subscriber) + : SubscriptionBase(subscriber) +{ + protected override void OnNext(IPipelineSubscriber subscriber, TMessage message, NextDelegate next) + => subscriber.OnNext(message, next); +} \ No newline at end of file diff --git a/src/Subscriptions/SubscriptionBase.cs b/src/Subscriptions/SubscriptionBase.cs new file mode 100644 index 0000000..b0d98d8 --- /dev/null +++ b/src/Subscriptions/SubscriptionBase.cs @@ -0,0 +1,25 @@ +namespace ProSol.Messaging.Subscriptions; + +/// +/// Represents a wrapper around , +/// which allows to make extra control of message execution +/// in child classes. +/// +/// A type of message. +/// A listener of a message. +public abstract class SubscriptionBase(IPipelineSubscriber subscriber) + : IPipelineSubscriber +{ + private readonly IPipelineSubscriber subscriber = subscriber; + + public void OnCompleted() + => OnCompleted(subscriber); + + public void OnNext(TMessage message, NextDelegate next) + => OnNext(subscriber, message, next); + + protected virtual void OnCompleted(IPipelineSubscriber subscriber) + => subscriber.OnCompleted(); + + protected abstract void OnNext(IPipelineSubscriber subscriber, TMessage message, NextDelegate next); +} diff --git a/src/Subscriptions/Subscriptions.csproj b/src/Subscriptions/Subscriptions.csproj new file mode 100644 index 0000000..13e873c --- /dev/null +++ b/src/Subscriptions/Subscriptions.csproj @@ -0,0 +1,12 @@ + + + + + + + + net8.0 + enable + enable + + diff --git a/src/Translating/ITranslatorPublisher.cs b/src/Translating/ITranslatorPublisher.cs deleted file mode 100644 index 7bcf3ca..0000000 --- a/src/Translating/ITranslatorPublisher.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace ProSol.Messaging.Translating; - -public interface ITranslatorPublisher -{ - IDisposable Subscribe(TranslatorBase translatorBase); -} \ No newline at end of file diff --git a/src/Translating/PublisherFluentExtensions.cs b/src/Translating/PublisherFluentExtensions.cs new file mode 100644 index 0000000..42b86a4 --- /dev/null +++ b/src/Translating/PublisherFluentExtensions.cs @@ -0,0 +1,13 @@ +namespace ProSol.Messaging.Translating; + +public static class IPublisherFluentExtensions +{ + public static IPublisher Translate( + this IPublisher publisher, + Func converter) + { + var result = new SimpleTranslator(converter); + var unsubscriber = publisher.Subscribe(result); + return result; + } +} diff --git a/src/Translating/SimpleTranslator.cs b/src/Translating/SimpleTranslator.cs index 767db17..287ff79 100644 --- a/src/Translating/SimpleTranslator.cs +++ b/src/Translating/SimpleTranslator.cs @@ -1,7 +1,7 @@ namespace ProSol.Messaging.Translating; -internal sealed class SimpleTranslator(ISubscriber destSubscriber, Func converter) - : TranslatorBase(destSubscriber) +public sealed class SimpleTranslator(Func converter) + : TranslatorBase { protected override TDest ConvertMessage(TSource message) => converter(message); diff --git a/src/Translating/Translating.csproj b/src/Translating/Translating.csproj new file mode 100644 index 0000000..13e873c --- /dev/null +++ b/src/Translating/Translating.csproj @@ -0,0 +1,12 @@ + + + + + + + + net8.0 + enable + enable + + diff --git a/src/Translating/TranslatorBase.cs b/src/Translating/TranslatorBase.cs index ca2521b..6361ce8 100644 --- a/src/Translating/TranslatorBase.cs +++ b/src/Translating/TranslatorBase.cs @@ -1,19 +1,20 @@ namespace ProSol.Messaging.Translating; -public abstract class TranslatorBase(ISubscriber destSubscriber) : ISubscriber +public abstract class TranslatorBase + : IPipelineSubscriber, IPublisher, IDisposable { - private readonly ISubscriber destSubscriber = destSubscriber; + private ISubscriber? destSubscriber; /// - public void OnCompleted() => destSubscriber.OnCompleted(); + public void OnCompleted() => destSubscriber?.OnCompleted(); /// public void OnNext(TSource message, NextDelegate next) { var dest = ConvertMessage(message); - if (dest != null) + if (dest != null && destSubscriber != null) { - destSubscriber.OnNext(dest, next); + PublishHelper.Publish(dest, destSubscriber); } else { @@ -31,6 +32,19 @@ public abstract class TranslatorBase(ISubscriber destSubs /// protected abstract TDest? ConvertMessage(TSource message); - public static TranslatorBase Create(ISubscriber destSubscriber, Func converter) - => new SimpleTranslator(destSubscriber, converter); + public IDisposable Subscribe(ISubscriber subscriber) + { + if (destSubscriber != null) + { + throw new InvalidOperationException("Unable to subscribe more than one subscriber."); + } + destSubscriber = subscriber; + return this; + } + + public void Dispose() + { + destSubscriber?.OnCompleted(); + destSubscriber = null; + } } diff --git a/src/Unsubscriber.cs b/src/Unsubscriber.cs deleted file mode 100644 index 7d59333..0000000 --- a/src/Unsubscriber.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace ProSol.Messaging; - -public sealed class Unsubscriber : IDisposable -{ - private readonly ICollection> subscribers; - private readonly ISubscriber target; - - internal Unsubscriber( - ICollection> subscribers, - ISubscriber target) - { - this.subscribers = subscribers; - this.target = target; - } - - public void Dispose() - { - subscribers.Remove(target); - } -} \ No newline at end of file diff --git a/tests/MessageTranslatorTests.cs b/tests/MessageTranslatorTests.cs deleted file mode 100644 index a50fc1b..0000000 --- a/tests/MessageTranslatorTests.cs +++ /dev/null @@ -1,35 +0,0 @@ -using FakeItEasy; -using ProSol.Messaging; -using ProSol.Messaging.Translating; - -namespace Messaging.Tests; - -public class MessageTranslatorTests -{ - [Test] - public void SimpleCase_OneToOne_ShouldWork() - { - // Arrange configs. - string[] input = ["Hello", "World!"]; - int[] expected = [5, 6]; - static int converter(string message) => message.Length; - List messages = []; - - // Arrange operations. - var fakeListener = A.Fake>(); - A.CallTo(() => fakeListener.OnNext(A._, A._)) - .Invokes((int i, NextDelegate next) => messages.Add(i)); - - var publisher = new PipelineMessagePublisher(); - publisher.Subscribe(TranslatorBase.Create(fakeListener, converter)); - - // Act. - foreach(var item in input) - { - publisher.Publish(item); - } - - // Assert. - Assert.That(messages, Is.EquivalentTo(expected)); - } -} \ No newline at end of file diff --git a/tests/Subscriptions/AllFilteredSubscriptionTests.cs b/tests/Subscriptions/AllFilteredSubscriptionTests.cs new file mode 100644 index 0000000..52d3aea --- /dev/null +++ b/tests/Subscriptions/AllFilteredSubscriptionTests.cs @@ -0,0 +1,41 @@ +using ProSol.Messaging.Subscriptions; + +namespace ProSol.Messaging.Tests.Subscriptions; + +public class AllFilteredSubscriptionTests +{ + private class IntSubscriber : IPipelineSubscriber + { + public List Numbers = []; + public void OnCompleted() { } + + public void OnNext(int message, NextDelegate next) + { + Numbers.Add(message); + next(); + } + } + + [Test] + public void IntSubscriber_ShouldReceive_OnlyLimitedRange() + { + Predicate[] predicates = [ + x => x >= 0, + x => x < 3 + ]; + int[] source = [-1, 0, 1, 2, 3, 4]; + int[] expected = [0, 1, 2]; + + var subscriber = new IntSubscriber(); + var subscription = new AllFilteredSubscription( + subscriber, + predicates); + + foreach (var item in source) + { + subscription.OnNext(item, () => {}); + } + + Assert.That(subscriber.Numbers.ToArray(), Is.EquivalentTo(expected)); + } +} \ No newline at end of file diff --git a/tests/Subscriptions/AnyFilteredSubscriptionTests.cs b/tests/Subscriptions/AnyFilteredSubscriptionTests.cs new file mode 100644 index 0000000..66b815d --- /dev/null +++ b/tests/Subscriptions/AnyFilteredSubscriptionTests.cs @@ -0,0 +1,51 @@ +using ProSol.Messaging.Subscriptions; + +namespace ProSol.Messaging.Tests.Subscriptions; + +public class AnyFilteredSubscriptionTests +{ + private class StringSubscriber : IPipelineSubscriber + { + public List Strings = []; + public void OnCompleted() { } + + public void OnNext(string message, NextDelegate next) + { + Strings.Add(message); + next(); + } + } + + [Test] + public void Pipeline_ShouldStop_OnAny_Keyword() + { + Predicate[] predicates = [ + x => x.Contains("ERROR"), + x => x.Contains("CRITICAL") + ]; + + string[] source = [ + "quick brown fox", + "introduced an ERROR", + "so the lazy dog", + "got a CRITICAL message" + ]; + + string[] expected = [ + "introduced an ERROR", + "got a CRITICAL message" + ]; + + var subscriber = new StringSubscriber(); + var subscription = new AnyFilteredSubscription( + subscriber, + predicates); + + foreach (var item in source) + { + subscription.OnNext(item, () => {}); + } + + Assert.That(subscriber.Strings.ToArray(), Is.EquivalentTo(expected)); + } +} \ No newline at end of file diff --git a/tests/GlobalUsings.cs b/tests/Subscriptions/GlobalUsings.cs similarity index 100% rename from tests/GlobalUsings.cs rename to tests/Subscriptions/GlobalUsings.cs diff --git a/tests/Messaging.Tests.csproj b/tests/Subscriptions/Subscriptions.Tests.csproj similarity index 82% rename from tests/Messaging.Tests.csproj rename to tests/Subscriptions/Subscriptions.Tests.csproj index cbda234..991c21e 100644 --- a/tests/Messaging.Tests.csproj +++ b/tests/Subscriptions/Subscriptions.Tests.csproj @@ -10,7 +10,6 @@ - @@ -19,7 +18,7 @@ - + diff --git a/tests/Translating/GlobalUsings.cs b/tests/Translating/GlobalUsings.cs new file mode 100644 index 0000000..cefced4 --- /dev/null +++ b/tests/Translating/GlobalUsings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; \ No newline at end of file diff --git a/tests/Translating/MessageTranslatorTests.cs b/tests/Translating/MessageTranslatorTests.cs new file mode 100644 index 0000000..ea787e0 --- /dev/null +++ b/tests/Translating/MessageTranslatorTests.cs @@ -0,0 +1,51 @@ +using ProSol.Messaging; +using ProSol.Messaging.Translating; + +namespace Messaging.Tests; + +public class MessageTranslatorTests +{ + [Test] + public void Translating_MultipleChains_ShouldWork() + { + // Arrange configs. + string[] input = ["Hello", "World!"]; + bool[] expected = [false, true]; + + // Arrange operations. + var dataSubscriber = new DataSubscriber(); + + var publisher = new TestPublisher(); + using var unsub = publisher + .Translate(x => x.Length) + .Translate(x => x > 5) + .Subscribe(dataSubscriber); + + // Act. + foreach(var item in input) + { + publisher.Publish(item); + } + + // Assert. + Assert.That(dataSubscriber.Messages, Is.EquivalentTo(expected)); + } + + public class TestPublisher : IPublisher + { + private ISubscriber? subscriber; + + public void Publish(TData message) + { + PublishHelper.Publish(message, subscriber!); + } + + public IDisposable Subscribe(ISubscriber subscriber) + { + this.subscriber = subscriber; + #pragma warning disable CS8603 + return null; + #pragma warning restore CS8603 + } + } +} \ No newline at end of file diff --git a/tests/Translating/Translating.Tests.csproj b/tests/Translating/Translating.Tests.csproj new file mode 100644 index 0000000..8704e38 --- /dev/null +++ b/tests/Translating/Translating.Tests.csproj @@ -0,0 +1,24 @@ + + + + net8.0 + enable + enable + + false + true + + + + + + + + + + + + + + +