Add async_manual_reset_event.

This commit is contained in:
Lewis Baker 2017-07-25 07:10:50 +09:30
parent 0526441579
commit 2af1b18c0f
6 changed files with 376 additions and 3 deletions

View File

@ -14,7 +14,7 @@ These include:
* Awaitable Types
* `single_consumer_event`
* `async_mutex`
* `async_manual_reset_event` (coming)
* `async_manual_reset_event`
* `async_auto_reset_event`
* Functions
* `when_all()` (coming)
@ -754,9 +754,90 @@ cppcoro::task<> add_item(std::string value)
}
```
## `async_manual_reset_event`
A manual-reset event is a coroutine/thread-synchronisation primitive that allows one or more threads
to wait until the event is signalled by a thread that calls `set()`.
The event is in one of two states; *'set'* and *'not set'*.
If the event is in the *'set'* state when a coroutine awaits the event then the coroutine
continues without suspending. However if the coroutine is in the *'not set'* state then the
coroutine is suspended until some thread subsequently calls the `set()` method.
Any threads that were suspended while waiting for the event to become *'set'* will be resumed
inside the next call to `set()` by some thread.
Note that you must ensure that no coroutines are awaiting a *'not set'* event when the
event is destructed as they will not be resumed.
Example:
```c++
cppcoro::async_manual_reset_event event;
std::string value;
void producer()
{
value = get_some_string_value();
// Publish a value by setting the event.
event.set();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value(value);
}
```
API Summary:
```c++
namespace cppcoro
{
class async_manual_reset_event_operation;
class async_manual_reset_event
{
public:
async_manual_reset_event(bool initiallySet = false) noexcept;
~async_manual_reset_event();
async_manual_reset_event(const async_manual_reset_event&) = delete;
async_manual_reset_event(async_manual_reset_event&&) = delete;
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;
// Wait until the event becomes set.
<unspecified> operator co_await() const noexcept;
bool is_set() const noexcept;
void set() noexcept;
void reset() noexcept;
};
class async_manual_reset_event
{
public:
async_manual_reset_event_operation(async_manual_reset_event& event) noexcept;
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
void await_resume() const noexcept;
};
}
```
## `async_auto_reset_event`
An auto-reset event is a coroutine/thread-synchronisation abstraction that allows one or more threads
An auto-reset event is a coroutine/thread-synchronisation primitive that allows one or more threads
to wait until the event is signalled by a thread by calling `set()`.
Once a coroutine that is awaiting the event is released by either a prior or subsequent call to `set()`

View File

@ -0,0 +1,104 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_ASYNC_MANUAL_RESET_EVENT_HPP_INCLUDED
#define CPPCORO_ASYNC_MANUAL_RESET_EVENT_HPP_INCLUDED
#include <experimental/coroutine>
#include <atomic>
#include <cstdint>
namespace cppcoro
{
class async_manual_reset_event_operation;
/// An async manual-reset event is a coroutine synchronisation abstraction
/// that allows one or more coroutines to wait until some thread calls
/// set() on the event.
///
/// When a coroutine awaits a 'set' event the coroutine continues without
/// suspending. Otherwise, if it awaits a 'not set' event the coroutine is
/// suspended and is later resumed inside the call to 'set()'.
///
/// \seealso async_auto_reset_event
class async_manual_reset_event
{
public:
/// Initialise the event to either 'set' or 'not set' state.
///
/// \param initiallySet
/// If 'true' then initialises the event to the 'set' state, otherwise
/// initialises the event to the 'not set' state.
async_manual_reset_event(bool initiallySet = false) noexcept;
~async_manual_reset_event();
/// Wait for the event to enter the 'set' state.
///
/// If the event is already 'set' then the coroutine continues without
/// suspending.
///
/// Otherwise, the coroutine is suspended and later resumed when some
/// thread calls 'set()'. The coroutine will be resumed inside the next
/// call to 'set()'.
async_manual_reset_event_operation operator co_await() const noexcept;
/// Query if the event is currently in the 'set' state.
bool is_set() const noexcept;
/// Set the state of the event to 'set'.
///
/// If there are pending coroutines awaiting the event then all
/// pending coroutines are resumed within this call.
/// Any coroutines that subsequently await the event will continue
/// without suspending.
///
/// This operation is a no-op if the event was already 'set'.
void set() noexcept;
/// Set the state of the event to 'not-set'.
///
/// Any coroutines that subsequently await the event will suspend
/// until some thread calls 'set()'.
///
/// This is a no-op if the state was already 'not set'.
void reset() noexcept;
private:
friend class async_manual_reset_event_operation;
// This variable has 3 states:
// - this - The state is 'set'.
// - nullptr - The state is 'not set' with no waiters.
// - other - The state is 'not set'.
// Points to an 'async_manual_reset_event_operation' that is
// the head of a linked-list of waiters.
mutable std::atomic<void*> m_state;
};
class async_manual_reset_event_operation
{
public:
explicit async_manual_reset_event_operation(const async_manual_reset_event& event) noexcept;
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
void await_resume() const noexcept {}
private:
friend class async_manual_reset_event;
const async_manual_reset_event& m_event;
async_manual_reset_event_operation* m_next;
std::experimental::coroutine_handle<> m_awaiter;
};
}
#endif

View File

@ -0,0 +1,99 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#include <cppcoro/async_manual_reset_event.hpp>
#include <cppcoro/config.hpp>
#include <cassert>
cppcoro::async_manual_reset_event::async_manual_reset_event(bool initiallySet) noexcept
: m_state(initiallySet ? static_cast<void*>(this) : nullptr)
{}
cppcoro::async_manual_reset_event::~async_manual_reset_event()
{
// There should be no coroutines still awaiting the event.
assert(
m_state.load(std::memory_order_relaxed) == nullptr ||
m_state.load(std::memory_order_relaxed) == static_cast<void*>(this));
}
bool cppcoro::async_manual_reset_event::is_set() const noexcept
{
return m_state.load(std::memory_order_acquire) == static_cast<const void*>(this);
}
cppcoro::async_manual_reset_event_operation
cppcoro::async_manual_reset_event::operator co_await() const noexcept
{
return async_manual_reset_event_operation{ *this };
}
void cppcoro::async_manual_reset_event::set() noexcept
{
void* const setState = static_cast<void*>(this);
// Needs 'release' semantics so that prior writes are visible to event awaiters
// that synchronise either via 'is_set()' or 'operator co_await()'.
// Needs 'acquire' semantics in case there are any waiters so that we see
// prior writes to the waiting coroutine's state and to the contents of
// the queued async_manual_reset_event_operation objects.
void* oldState = m_state.exchange(setState, std::memory_order_acq_rel);
if (oldState != setState)
{
auto* current = static_cast<async_manual_reset_event_operation*>(oldState);
while (current != nullptr)
{
auto* next = current->m_next;
current->m_awaiter.resume();
current = next;
}
}
}
void cppcoro::async_manual_reset_event::reset() noexcept
{
void* oldState = static_cast<void*>(this);
m_state.compare_exchange_strong(oldState, nullptr, std::memory_order_relaxed);
}
cppcoro::async_manual_reset_event_operation::async_manual_reset_event_operation(
const async_manual_reset_event& event) noexcept
: m_event(event)
{
}
bool cppcoro::async_manual_reset_event_operation::await_ready() const noexcept
{
return m_event.is_set();
}
bool cppcoro::async_manual_reset_event_operation::await_suspend(
std::experimental::coroutine_handle<> awaiter) noexcept
{
m_awaiter = awaiter;
const void* const setState = static_cast<const void*>(&m_event);
void* oldState = m_event.m_state.load(std::memory_order_acquire);
do
{
if (oldState == setState)
{
// State is now 'set' no need to suspend.
return false;
}
m_next = static_cast<async_manual_reset_event_operation*>(oldState);
} while (!m_event.m_state.compare_exchange_weak(
oldState,
static_cast<void*>(this),
std::memory_order_release,
std::memory_order_acquire));
// Successfully queued this waiter to the list.
return true;
}

View File

@ -9,6 +9,7 @@ from cake.tools import compiler, script, env, project, variant
includes = cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', [
'async_auto_reset_event.hpp',
'async_manual_reset_event.hpp',
'async_generator.hpp',
'async_mutex.hpp',
'broken_promise.hpp',
@ -47,6 +48,7 @@ privateHeaders = script.cwd([
sources = script.cwd([
'async_auto_reset_event.cpp',
'async_manual_reset_event.cpp',
'async_mutex.cpp',
'cancellation_state.cpp',
'cancellation_token.cpp',

View File

@ -0,0 +1,86 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#include <cppcoro/async_manual_reset_event.hpp>
#include <cppcoro/task.hpp>
#include "doctest/doctest.h"
TEST_SUITE_BEGIN("async_manual_reset_event");
TEST_CASE("default constructor initially not set")
{
cppcoro::async_manual_reset_event event;
CHECK(!event.is_set());
}
TEST_CASE("construct event initially set")
{
cppcoro::async_manual_reset_event event{ true };
CHECK(event.is_set());
}
TEST_CASE("set and reset")
{
cppcoro::async_manual_reset_event event;
CHECK(!event.is_set());
event.set();
CHECK(event.is_set());
event.set();
CHECK(event.is_set());
event.reset();
CHECK(!event.is_set());
event.reset();
CHECK(!event.is_set());
event.set();
CHECK(event.is_set());
}
TEST_CASE("await not set event")
{
cppcoro::async_manual_reset_event event;
auto createWaiter = [&]() -> cppcoro::task<>
{
co_await event;
};
auto t1 = createWaiter();
auto t2 = createWaiter();
CHECK(!t1.is_ready());
CHECK(!t2.is_ready());
event.reset();
CHECK(!t1.is_ready());
CHECK(!t2.is_ready());
event.set();
CHECK(t1.is_ready());
CHECK(t2.is_ready());
}
TEST_CASE("awaiting already set event doesn't suspend")
{
cppcoro::async_manual_reset_event event{ true };
auto createWaiter = [&]() -> cppcoro::task<>
{
co_await event;
};
auto t1 = createWaiter();
CHECK(t1.is_ready());
auto t2 = createWaiter();
CHECK(t2.is_ready());
}
TEST_SUITE_END();

View File

@ -21,8 +21,9 @@ sources = script.cwd([
'generator_tests.cpp',
'recursive_generator_tests.cpp',
'async_generator_tests.cpp',
'async_mutex_tests.cpp',
'async_auto_reset_event_tests.cpp',
'async_manual_reset_event_tests.cpp',
'async_mutex_tests.cpp',
'cancellation_token_tests.cpp',
'lazy_task_tests.cpp',
'shared_lazy_task_tests.cpp',