Add cppcoro::async_latch.
This commit is contained in:
parent
c975586b44
commit
b0eacb719e
41
README.md
41
README.md
|
@ -15,6 +15,7 @@ These include:
|
||||||
* `async_mutex`
|
* `async_mutex`
|
||||||
* `async_manual_reset_event`
|
* `async_manual_reset_event`
|
||||||
* `async_auto_reset_event`
|
* `async_auto_reset_event`
|
||||||
|
* `async_latch`
|
||||||
* Functions
|
* Functions
|
||||||
* `sync_wait()`
|
* `sync_wait()`
|
||||||
* `when_all()`
|
* `when_all()`
|
||||||
|
@ -897,6 +898,46 @@ namespace cppcoro
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## `async_latch`
|
||||||
|
|
||||||
|
An async latch is a synchronisation primitive that allows coroutines to asynchronously
|
||||||
|
wait until a counter has been decremented to zero.
|
||||||
|
|
||||||
|
The latch is a single-use object. Once the counter reaches zero the latch becomes 'ready'
|
||||||
|
and will remain ready until the latch is destroyed.
|
||||||
|
|
||||||
|
API Summary:
|
||||||
|
```c++
|
||||||
|
// <cppcoro/async_latch.hpp>
|
||||||
|
namespace cppcoro
|
||||||
|
{
|
||||||
|
class async_latch
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
// Initialise the latch with the specified count.
|
||||||
|
async_latch(std::ptrdiff_t initialCount) noexcept;
|
||||||
|
|
||||||
|
// Query if the count has reached zero yet.
|
||||||
|
bool is_ready() const noexcept;
|
||||||
|
|
||||||
|
// Decrement the count by n.
|
||||||
|
// This will resume any waiting coroutines if the count reaches zero
|
||||||
|
// as a result of this call.
|
||||||
|
// It is undefined behaviour to decrement the count below zero.
|
||||||
|
void count_down(std::ptrdiff_t n = 1) noexcept;
|
||||||
|
|
||||||
|
// Wait until the latch becomes ready.
|
||||||
|
// If the latch count is not yet zero then the awaiting coroutine will
|
||||||
|
// be suspended and later resumed by a call to count_down() that decrements
|
||||||
|
// the count to zero. If the latch count was already zero then the coroutine
|
||||||
|
// continues without suspending.
|
||||||
|
Awaiter<void> operator co_await() const noexcept;
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
## `cancellation_token`
|
## `cancellation_token`
|
||||||
|
|
||||||
A `cancellation_token` is a value that can be passed to a function that allows the caller to subsequently communicate a request to cancel the operation to that function.
|
A `cancellation_token` is a value that can be passed to a function that allows the caller to subsequently communicate a request to cancel the operation to that function.
|
||||||
|
|
75
include/cppcoro/async_latch.hpp
Normal file
75
include/cppcoro/async_latch.hpp
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Copyright (c) Lewis Baker
|
||||||
|
// Licenced under MIT license. See LICENSE.txt for details.
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
#ifndef CPPCORO_ASYNC_LATCH_HPP_INCLUDED
|
||||||
|
#define CPPCORO_ASYNC_LATCH_HPP_INCLUDED
|
||||||
|
|
||||||
|
#include <cppcoro/async_manual_reset_event.hpp>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
namespace cppcoro
|
||||||
|
{
|
||||||
|
class async_latch
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
/// Construct the latch with the specified initial count.
|
||||||
|
///
|
||||||
|
/// \param initialCount
|
||||||
|
/// The initial count of the latch. The latch will become signalled once
|
||||||
|
/// \c this->count_down() has been called \p initialCount times.
|
||||||
|
/// The latch will be immediately signalled on construction if this
|
||||||
|
/// parameter is zero or negative.
|
||||||
|
async_latch(std::ptrdiff_t initialCount) noexcept
|
||||||
|
: m_count(initialCount)
|
||||||
|
, m_event(initialCount <= 0)
|
||||||
|
{}
|
||||||
|
|
||||||
|
/// Query if the latch has become signalled.
|
||||||
|
///
|
||||||
|
/// The latch is marked as signalled once the count reaches zero.
|
||||||
|
bool is_ready() const noexcept { return m_event.is_set(); }
|
||||||
|
|
||||||
|
/// Decrement the count by n.
|
||||||
|
///
|
||||||
|
/// Any coroutines awaiting this latch will be resumed once the count
|
||||||
|
/// reaches zero. ie. when this method has been called at least 'initialCount'
|
||||||
|
/// times.
|
||||||
|
///
|
||||||
|
/// Any awaiting coroutines that are currently suspended waiting for the
|
||||||
|
/// latch to become signalled will be resumed inside the last call to this
|
||||||
|
/// method (ie. the call that decrements the count to zero).
|
||||||
|
///
|
||||||
|
/// \param n
|
||||||
|
/// The amount to decrement the count by.
|
||||||
|
void count_down(std::ptrdiff_t n = 1) noexcept
|
||||||
|
{
|
||||||
|
if (m_count.fetch_sub(n, std::memory_order_acq_rel) <= n)
|
||||||
|
{
|
||||||
|
m_event.set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Allows the latch to be awaited within a coroutine.
|
||||||
|
///
|
||||||
|
/// If the latch is already signalled (ie. the count has been decremented
|
||||||
|
/// to zero) then the awaiting coroutine will continue without suspending.
|
||||||
|
/// Otherwise, the coroutine will suspend and will later be resumed inside
|
||||||
|
/// a call to `count_down()`.
|
||||||
|
auto operator co_await() const noexcept
|
||||||
|
{
|
||||||
|
return m_event.operator co_await();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
std::atomic<std::ptrdiff_t> m_count;
|
||||||
|
async_manual_reset_event m_event;
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
|
@ -12,6 +12,7 @@ includes = cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', [
|
||||||
'async_manual_reset_event.hpp',
|
'async_manual_reset_event.hpp',
|
||||||
'async_generator.hpp',
|
'async_generator.hpp',
|
||||||
'async_mutex.hpp',
|
'async_mutex.hpp',
|
||||||
|
'async_latch.hpp',
|
||||||
'broken_promise.hpp',
|
'broken_promise.hpp',
|
||||||
'cancellation_registration.hpp',
|
'cancellation_registration.hpp',
|
||||||
'cancellation_source.hpp',
|
'cancellation_source.hpp',
|
||||||
|
|
113
test/async_latch_tests.cpp
Normal file
113
test/async_latch_tests.cpp
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Copyright (c) Lewis Baker
|
||||||
|
// Licenced under MIT license. See LICENSE.txt for details.
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include <cppcoro/async_latch.hpp>
|
||||||
|
#include <cppcoro/single_consumer_event.hpp>
|
||||||
|
#include <cppcoro/task.hpp>
|
||||||
|
#include <cppcoro/when_all_ready.hpp>
|
||||||
|
#include <cppcoro/sync_wait.hpp>
|
||||||
|
|
||||||
|
#include <ostream>
|
||||||
|
#include "doctest/doctest.h"
|
||||||
|
|
||||||
|
TEST_SUITE_BEGIN("async_latch");
|
||||||
|
|
||||||
|
using namespace cppcoro;
|
||||||
|
|
||||||
|
TEST_CASE("latch constructed with zero count is initially ready")
|
||||||
|
{
|
||||||
|
async_latch latch(0);
|
||||||
|
CHECK(latch.is_ready());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("latch constructed with negative count is initially ready")
|
||||||
|
{
|
||||||
|
async_latch latch(-3);
|
||||||
|
CHECK(latch.is_ready());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("count_down and is_ready")
|
||||||
|
{
|
||||||
|
async_latch latch(3);
|
||||||
|
CHECK(!latch.is_ready());
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(!latch.is_ready());
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(!latch.is_ready());
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(latch.is_ready());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("count_down by n")
|
||||||
|
{
|
||||||
|
async_latch latch(5);
|
||||||
|
latch.count_down(3);
|
||||||
|
CHECK(!latch.is_ready());
|
||||||
|
latch.count_down(2);
|
||||||
|
CHECK(latch.is_ready());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("single awaiter")
|
||||||
|
{
|
||||||
|
async_latch latch(2);
|
||||||
|
bool after = false;
|
||||||
|
sync_wait(when_all_ready(
|
||||||
|
[&]() -> task<>
|
||||||
|
{
|
||||||
|
co_await latch;
|
||||||
|
after = true;
|
||||||
|
}(),
|
||||||
|
[&]() -> task<>
|
||||||
|
{
|
||||||
|
CHECK(!after);
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(!after);
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(after);
|
||||||
|
co_return;
|
||||||
|
}()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("multiple awaiters")
|
||||||
|
{
|
||||||
|
async_latch latch(2);
|
||||||
|
bool after1 = false;
|
||||||
|
bool after2 = false;
|
||||||
|
bool after3 = false;
|
||||||
|
sync_wait(when_all_ready(
|
||||||
|
[&]() -> task<>
|
||||||
|
{
|
||||||
|
co_await latch;
|
||||||
|
after1 = true;
|
||||||
|
}(),
|
||||||
|
[&]() -> task<>
|
||||||
|
{
|
||||||
|
co_await latch;
|
||||||
|
after2 = true;
|
||||||
|
}(),
|
||||||
|
[&]() -> task<>
|
||||||
|
{
|
||||||
|
co_await latch;
|
||||||
|
after3 = true;
|
||||||
|
}(),
|
||||||
|
[&]() -> task<>
|
||||||
|
{
|
||||||
|
CHECK(!after1);
|
||||||
|
CHECK(!after2);
|
||||||
|
CHECK(!after3);
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(!after1);
|
||||||
|
CHECK(!after2);
|
||||||
|
CHECK(!after3);
|
||||||
|
latch.count_down();
|
||||||
|
CHECK(after1);
|
||||||
|
CHECK(after2);
|
||||||
|
CHECK(after3);
|
||||||
|
co_return;
|
||||||
|
}()));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_SUITE_END();
|
|
@ -25,6 +25,7 @@ sources = script.cwd([
|
||||||
'async_auto_reset_event_tests.cpp',
|
'async_auto_reset_event_tests.cpp',
|
||||||
'async_manual_reset_event_tests.cpp',
|
'async_manual_reset_event_tests.cpp',
|
||||||
'async_mutex_tests.cpp',
|
'async_mutex_tests.cpp',
|
||||||
|
'async_latch_tests.cpp',
|
||||||
'cancellation_token_tests.cpp',
|
'cancellation_token_tests.cpp',
|
||||||
'task_tests.cpp',
|
'task_tests.cpp',
|
||||||
'shared_task_tests.cpp',
|
'shared_task_tests.cpp',
|
||||||
|
|
Loading…
Reference in a new issue