diff --git a/README.md b/README.md index 9f12267..deb4289 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ These include: * `async_mutex` * `async_manual_reset_event` * `async_auto_reset_event` + * `async_latch` * Functions * `sync_wait()` * `when_all()` @@ -897,6 +898,46 @@ namespace cppcoro } ``` +## `async_latch` + +An async latch is a synchronisation primitive that allows coroutines to asynchronously +wait until a counter has been decremented to zero. + +The latch is a single-use object. Once the counter reaches zero the latch becomes 'ready' +and will remain ready until the latch is destroyed. + +API Summary: +```c++ +// +namespace cppcoro +{ + class async_latch + { + public: + + // Initialise the latch with the specified count. + async_latch(std::ptrdiff_t initialCount) noexcept; + + // Query if the count has reached zero yet. + bool is_ready() const noexcept; + + // Decrement the count by n. + // This will resume any waiting coroutines if the count reaches zero + // as a result of this call. + // It is undefined behaviour to decrement the count below zero. + void count_down(std::ptrdiff_t n = 1) noexcept; + + // Wait until the latch becomes ready. + // If the latch count is not yet zero then the awaiting coroutine will + // be suspended and later resumed by a call to count_down() that decrements + // the count to zero. If the latch count was already zero then the coroutine + // continues without suspending. + Awaiter operator co_await() const noexcept; + + }; +} +``` + ## `cancellation_token` A `cancellation_token` is a value that can be passed to a function that allows the caller to subsequently communicate a request to cancel the operation to that function. diff --git a/include/cppcoro/async_latch.hpp b/include/cppcoro/async_latch.hpp new file mode 100644 index 0000000..9dfbb50 --- /dev/null +++ b/include/cppcoro/async_latch.hpp @@ -0,0 +1,75 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Lewis Baker +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef CPPCORO_ASYNC_LATCH_HPP_INCLUDED +#define CPPCORO_ASYNC_LATCH_HPP_INCLUDED + +#include + +#include +#include + +namespace cppcoro +{ + class async_latch + { + public: + + /// Construct the latch with the specified initial count. + /// + /// \param initialCount + /// The initial count of the latch. The latch will become signalled once + /// \c this->count_down() has been called \p initialCount times. + /// The latch will be immediately signalled on construction if this + /// parameter is zero or negative. + async_latch(std::ptrdiff_t initialCount) noexcept + : m_count(initialCount) + , m_event(initialCount <= 0) + {} + + /// Query if the latch has become signalled. + /// + /// The latch is marked as signalled once the count reaches zero. + bool is_ready() const noexcept { return m_event.is_set(); } + + /// Decrement the count by n. + /// + /// Any coroutines awaiting this latch will be resumed once the count + /// reaches zero. ie. when this method has been called at least 'initialCount' + /// times. + /// + /// Any awaiting coroutines that are currently suspended waiting for the + /// latch to become signalled will be resumed inside the last call to this + /// method (ie. the call that decrements the count to zero). + /// + /// \param n + /// The amount to decrement the count by. + void count_down(std::ptrdiff_t n = 1) noexcept + { + if (m_count.fetch_sub(n, std::memory_order_acq_rel) <= n) + { + m_event.set(); + } + } + + /// Allows the latch to be awaited within a coroutine. + /// + /// If the latch is already signalled (ie. the count has been decremented + /// to zero) then the awaiting coroutine will continue without suspending. + /// Otherwise, the coroutine will suspend and will later be resumed inside + /// a call to `count_down()`. + auto operator co_await() const noexcept + { + return m_event.operator co_await(); + } + + private: + + std::atomic m_count; + async_manual_reset_event m_event; + + }; +} + +#endif diff --git a/lib/build.cake b/lib/build.cake index 04138d7..a7dd236 100644 --- a/lib/build.cake +++ b/lib/build.cake @@ -12,6 +12,7 @@ includes = cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', [ 'async_manual_reset_event.hpp', 'async_generator.hpp', 'async_mutex.hpp', + 'async_latch.hpp', 'broken_promise.hpp', 'cancellation_registration.hpp', 'cancellation_source.hpp', diff --git a/test/async_latch_tests.cpp b/test/async_latch_tests.cpp new file mode 100644 index 0000000..a90a7ba --- /dev/null +++ b/test/async_latch_tests.cpp @@ -0,0 +1,113 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Lewis Baker +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include +#include + +#include +#include "doctest/doctest.h" + +TEST_SUITE_BEGIN("async_latch"); + +using namespace cppcoro; + +TEST_CASE("latch constructed with zero count is initially ready") +{ + async_latch latch(0); + CHECK(latch.is_ready()); +} + +TEST_CASE("latch constructed with negative count is initially ready") +{ + async_latch latch(-3); + CHECK(latch.is_ready()); +} + +TEST_CASE("count_down and is_ready") +{ + async_latch latch(3); + CHECK(!latch.is_ready()); + latch.count_down(); + CHECK(!latch.is_ready()); + latch.count_down(); + CHECK(!latch.is_ready()); + latch.count_down(); + CHECK(latch.is_ready()); +} + +TEST_CASE("count_down by n") +{ + async_latch latch(5); + latch.count_down(3); + CHECK(!latch.is_ready()); + latch.count_down(2); + CHECK(latch.is_ready()); +} + +TEST_CASE("single awaiter") +{ + async_latch latch(2); + bool after = false; + sync_wait(when_all_ready( + [&]() -> task<> + { + co_await latch; + after = true; + }(), + [&]() -> task<> + { + CHECK(!after); + latch.count_down(); + CHECK(!after); + latch.count_down(); + CHECK(after); + co_return; + }() + )); +} + +TEST_CASE("multiple awaiters") +{ + async_latch latch(2); + bool after1 = false; + bool after2 = false; + bool after3 = false; + sync_wait(when_all_ready( + [&]() -> task<> + { + co_await latch; + after1 = true; + }(), + [&]() -> task<> + { + co_await latch; + after2 = true; + }(), + [&]() -> task<> + { + co_await latch; + after3 = true; + }(), + [&]() -> task<> + { + CHECK(!after1); + CHECK(!after2); + CHECK(!after3); + latch.count_down(); + CHECK(!after1); + CHECK(!after2); + CHECK(!after3); + latch.count_down(); + CHECK(after1); + CHECK(after2); + CHECK(after3); + co_return; + }())); +} + +TEST_SUITE_END(); diff --git a/test/build.cake b/test/build.cake index f31f822..40029fb 100644 --- a/test/build.cake +++ b/test/build.cake @@ -25,6 +25,7 @@ sources = script.cwd([ 'async_auto_reset_event_tests.cpp', 'async_manual_reset_event_tests.cpp', 'async_mutex_tests.cpp', + 'async_latch_tests.cpp', 'cancellation_token_tests.cpp', 'task_tests.cpp', 'shared_task_tests.cpp',