Add async_auto_reset_event.
This commit is contained in:
parent
be43f4ac7e
commit
4b20a243b2
70
README.md
70
README.md
|
@ -15,6 +15,7 @@ These include:
|
||||||
* `single_consumer_event`
|
* `single_consumer_event`
|
||||||
* `async_mutex`
|
* `async_mutex`
|
||||||
* `async_manual_reset_event` (coming)
|
* `async_manual_reset_event` (coming)
|
||||||
|
* `async_auto_reset_event`
|
||||||
* Functions
|
* Functions
|
||||||
* `when_all()` (coming)
|
* `when_all()` (coming)
|
||||||
* Cancellation
|
* Cancellation
|
||||||
|
@ -753,6 +754,75 @@ cppcoro::task<> add_item(std::string value)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## `async_auto_reset_event`
|
||||||
|
|
||||||
|
An auto-reset event is a coroutine/thread-synchronisation abstraction that allows one or more threads
|
||||||
|
to wait until the event is signalled by a thread by calling `set()`.
|
||||||
|
|
||||||
|
Once a coroutine that is awaiting the event is released by either a prior or subsequent call to `set()`
|
||||||
|
the event is automatically reset back to the 'not set' state.
|
||||||
|
|
||||||
|
API Summary:
|
||||||
|
```c++
|
||||||
|
// <cppcoro/async_auto_reset_event.hpp>
|
||||||
|
namespace cppcoro
|
||||||
|
{
|
||||||
|
class async_auto_reset_event_operation;
|
||||||
|
|
||||||
|
class async_auto_reset_event
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
async_auto_reset_event(bool initiallySet = false) noexcept;
|
||||||
|
|
||||||
|
~async_auto_reset_event();
|
||||||
|
|
||||||
|
async_auto_reset_event(const async_auto_reset_event&) = delete;
|
||||||
|
async_auto_reset_event(async_auto_reset_event&&) = delete;
|
||||||
|
async_auto_reset_event& operator=(const async_auto_reset_event&) = delete;
|
||||||
|
async_auto_reset_event& operator=(async_auto_reset_event&&) = delete;
|
||||||
|
|
||||||
|
// Wait for the event to enter the 'set' state.
|
||||||
|
//
|
||||||
|
// If the event is already 'set' then the event is set to the 'not set'
|
||||||
|
// state and the awaiting coroutine continues without suspending.
|
||||||
|
// Otherwise, the coroutine is suspended and later resumed when some
|
||||||
|
// thread calls 'set()'.
|
||||||
|
//
|
||||||
|
// Note that the coroutine may be resumed inside a call to 'set()'
|
||||||
|
// or inside another thread's call to 'operator co_await()'.
|
||||||
|
async_auto_reset_event_operation operator co_await() const noexcept;
|
||||||
|
|
||||||
|
// Set the state of the event to 'set'.
|
||||||
|
//
|
||||||
|
// If there are pending coroutines awaiting the event then one
|
||||||
|
// pending coroutine is resumed and the state is immediately
|
||||||
|
// set back to the 'not set' state.
|
||||||
|
//
|
||||||
|
// This operation is a no-op if the event was already 'set'.
|
||||||
|
void set() noexcept;
|
||||||
|
|
||||||
|
// Set the state of the event to 'not-set'.
|
||||||
|
//
|
||||||
|
// This is a no-op if the state was already 'not set'.
|
||||||
|
void reset() noexcept;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
class async_auto_reset_event_operation
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit async_auto_reset_event_operation(async_auto_reset_event& event) noexcept;
|
||||||
|
async_auto_reset_event_operation(const async_auto_reset_event_operation& other) noexcept;
|
||||||
|
|
||||||
|
bool await_ready() const noexcept;
|
||||||
|
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
|
||||||
|
void await_resume() const noexcept;
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
## `cancellation_token`
|
## `cancellation_token`
|
||||||
|
|
||||||
A `cancellation_token` is a value that can be passed to a function that allows the caller to subsequently communicate a request to cancel the operation to that function.
|
A `cancellation_token` is a value that can be passed to a function that allows the caller to subsequently communicate a request to cancel the operation to that function.
|
||||||
|
|
98
include/cppcoro/async_auto_reset_event.hpp
Normal file
98
include/cppcoro/async_auto_reset_event.hpp
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Copyright (c) Lewis Baker
|
||||||
|
// Licenced under MIT license. See LICENSE.txt for details.
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
#ifndef CPPCORO_ASYNC_AUTO_RESET_EVENT_HPP_INCLUDED
|
||||||
|
#define CPPCORO_ASYNC_AUTO_RESET_EVENT_HPP_INCLUDED
|
||||||
|
|
||||||
|
#include <experimental/coroutine>
|
||||||
|
#include <atomic>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
namespace cppcoro
|
||||||
|
{
|
||||||
|
class async_auto_reset_event_operation;
|
||||||
|
|
||||||
|
/// An async auto-reset event is a coroutine synchronisation abstraction
|
||||||
|
/// that allows one or more coroutines to wait until some thread calls
|
||||||
|
/// set() on the event.
|
||||||
|
///
|
||||||
|
/// When a coroutine awaits a 'set' event the event is automatically
|
||||||
|
/// reset back to the 'not set' state, thus the name 'auto reset' event.
|
||||||
|
class async_auto_reset_event
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
/// Initialise the event to either 'set' or 'not set' state.
|
||||||
|
async_auto_reset_event(bool initiallySet = false) noexcept;
|
||||||
|
|
||||||
|
~async_auto_reset_event();
|
||||||
|
|
||||||
|
/// Wait for the event to enter the 'set' state.
|
||||||
|
///
|
||||||
|
/// If the event is already 'set' then the event is set to the 'not set'
|
||||||
|
/// state and the awaiting coroutine continues without suspending.
|
||||||
|
/// Otherwise, the coroutine is suspended and later resumed when some
|
||||||
|
/// thread calls 'set()'.
|
||||||
|
///
|
||||||
|
/// Note that the coroutine may be resumed inside a call to 'set()'
|
||||||
|
/// or inside another thread's call to 'operator co_await()'.
|
||||||
|
async_auto_reset_event_operation operator co_await() const noexcept;
|
||||||
|
|
||||||
|
/// Set the state of the event to 'set'.
|
||||||
|
///
|
||||||
|
/// If there are pending coroutines awaiting the event then one
|
||||||
|
/// pending coroutine is resumed and the state is immediately
|
||||||
|
/// set back to the 'not set' state.
|
||||||
|
///
|
||||||
|
/// This operation is a no-op if the event was already 'set'.
|
||||||
|
void set() noexcept;
|
||||||
|
|
||||||
|
/// Set the state of the event to 'not-set'.
|
||||||
|
///
|
||||||
|
/// This is a no-op if the state was already 'not set'.
|
||||||
|
void reset() noexcept;
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
friend class async_auto_reset_event_operation;
|
||||||
|
|
||||||
|
void resume_waiters(std::uint64_t initialState) const noexcept;
|
||||||
|
|
||||||
|
// Bits 0-31 - Set count
|
||||||
|
// Bits 32-63 - Waiter count
|
||||||
|
mutable std::atomic<std::uint64_t> m_state;
|
||||||
|
|
||||||
|
mutable std::atomic<async_auto_reset_event_operation*> m_newWaiters;
|
||||||
|
|
||||||
|
mutable async_auto_reset_event_operation* m_waiters;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
class async_auto_reset_event_operation
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
async_auto_reset_event_operation() noexcept;
|
||||||
|
|
||||||
|
explicit async_auto_reset_event_operation(const async_auto_reset_event& event) noexcept;
|
||||||
|
|
||||||
|
async_auto_reset_event_operation(const async_auto_reset_event_operation& other) noexcept;
|
||||||
|
|
||||||
|
bool await_ready() const noexcept { return m_event == nullptr; }
|
||||||
|
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept;
|
||||||
|
void await_resume() const noexcept {}
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
friend class async_auto_reset_event;
|
||||||
|
|
||||||
|
const async_auto_reset_event* m_event;
|
||||||
|
async_auto_reset_event_operation* m_next;
|
||||||
|
std::experimental::coroutine_handle<> m_awaiter;
|
||||||
|
std::atomic<std::uint32_t> m_refCount;
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
281
lib/async_auto_reset_event.cpp
Normal file
281
lib/async_auto_reset_event.cpp
Normal file
|
@ -0,0 +1,281 @@
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Copyright (c) Lewis Baker
|
||||||
|
// Licenced under MIT license. See LICENSE.txt for details.
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include <cppcoro/async_auto_reset_event.hpp>
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
namespace local
|
||||||
|
{
|
||||||
|
// Some helpers for manipulating the 'm_state' value.
|
||||||
|
|
||||||
|
constexpr std::uint64_t set_increment = 1;
|
||||||
|
constexpr std::uint64_t waiter_increment = std::uint64_t(1) << 32;
|
||||||
|
|
||||||
|
constexpr std::uint32_t get_set_count(std::uint64_t state)
|
||||||
|
{
|
||||||
|
return static_cast<std::uint32_t>(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
constexpr std::uint32_t get_waiter_count(std::uint64_t state)
|
||||||
|
{
|
||||||
|
return static_cast<std::uint32_t>(state >> 32);
|
||||||
|
}
|
||||||
|
|
||||||
|
constexpr std::uint32_t get_resumable_waiter_count(std::uint64_t state)
|
||||||
|
{
|
||||||
|
return std::min(get_set_count(state), get_waiter_count(state));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cppcoro::async_auto_reset_event::async_auto_reset_event(bool initiallySet) noexcept
|
||||||
|
: m_state(initiallySet ? local::set_increment : 0)
|
||||||
|
, m_newWaiters(nullptr)
|
||||||
|
, m_waiters(nullptr)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
cppcoro::async_auto_reset_event::~async_auto_reset_event()
|
||||||
|
{
|
||||||
|
assert(m_newWaiters.load(std::memory_order_relaxed) == nullptr);
|
||||||
|
assert(m_waiters == nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
cppcoro::async_auto_reset_event_operation
|
||||||
|
cppcoro::async_auto_reset_event::operator co_await() const noexcept
|
||||||
|
{
|
||||||
|
std::uint64_t oldState = m_state.load(std::memory_order_relaxed);
|
||||||
|
if (local::get_set_count(oldState) > local::get_waiter_count(oldState))
|
||||||
|
{
|
||||||
|
// Try to synchronously acquire the event.
|
||||||
|
if (m_state.compare_exchange_strong(
|
||||||
|
oldState,
|
||||||
|
oldState - local::set_increment,
|
||||||
|
std::memory_order_acquire,
|
||||||
|
std::memory_order_relaxed))
|
||||||
|
{
|
||||||
|
// Acquired the event, return an operation object that
|
||||||
|
// won't suspend.
|
||||||
|
return async_auto_reset_event_operation{};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return async_auto_reset_event_operation{ *this };
|
||||||
|
}
|
||||||
|
|
||||||
|
void cppcoro::async_auto_reset_event::set() noexcept
|
||||||
|
{
|
||||||
|
std::uint64_t oldState = m_state.load(std::memory_order_relaxed);
|
||||||
|
do
|
||||||
|
{
|
||||||
|
if (local::get_set_count(oldState) > local::get_waiter_count(oldState))
|
||||||
|
{
|
||||||
|
// Already set.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment the set-count
|
||||||
|
} while (!m_state.compare_exchange_weak(
|
||||||
|
oldState,
|
||||||
|
oldState + local::set_increment,
|
||||||
|
std::memory_order_acq_rel,
|
||||||
|
std::memory_order_acquire));
|
||||||
|
|
||||||
|
// Did we transition from non-zero waiters and zero set-count
|
||||||
|
// to non-zero set-count?
|
||||||
|
// If so then we acquired the lock and are responsible for resuming waiters.
|
||||||
|
if (oldState != 0 && local::get_set_count(oldState) == 0)
|
||||||
|
{
|
||||||
|
// We acquired the lock.
|
||||||
|
resume_waiters(oldState + local::set_increment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void cppcoro::async_auto_reset_event::reset() noexcept
|
||||||
|
{
|
||||||
|
std::uint64_t oldState = m_state.load(std::memory_order_relaxed);
|
||||||
|
while (local::get_set_count(oldState) > local::get_waiter_count(oldState))
|
||||||
|
{
|
||||||
|
if (m_state.compare_exchange_weak(
|
||||||
|
oldState,
|
||||||
|
oldState - local::set_increment,
|
||||||
|
std::memory_order_relaxed))
|
||||||
|
{
|
||||||
|
// Successfully reset.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not set. Nothing to do.
|
||||||
|
}
|
||||||
|
|
||||||
|
void cppcoro::async_auto_reset_event::resume_waiters(
|
||||||
|
std::uint64_t initialState) const noexcept
|
||||||
|
{
|
||||||
|
async_auto_reset_event_operation* waitersToResumeList = nullptr;
|
||||||
|
async_auto_reset_event_operation** waitersToResumeListEnd = &waitersToResumeList;
|
||||||
|
|
||||||
|
std::uint32_t waiterCountToResume = local::get_resumable_waiter_count(initialState);
|
||||||
|
|
||||||
|
assert(waiterCountToResume > 0);
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
// Dequeue 'waiterCountToResume' from m_waiters/m_newWaiters and
|
||||||
|
// push them onto 'waitersToResumeList'.
|
||||||
|
for (std::uint32_t i = 0; i < waiterCountToResume; ++i)
|
||||||
|
{
|
||||||
|
if (m_waiters == nullptr)
|
||||||
|
{
|
||||||
|
// We've run out of of waiters that we can consume without synchronisation
|
||||||
|
// Dequeue the list of new waiters atomically.
|
||||||
|
auto* newWaiters = m_newWaiters.exchange(nullptr, std::memory_order_acquire);
|
||||||
|
|
||||||
|
// There should always be enough waiters in the list as
|
||||||
|
// the waiters are queued before the waiter-count is incremented.
|
||||||
|
assert(newWaiters != nullptr);
|
||||||
|
|
||||||
|
// Reverse order of new waiters so they are resumed in FIFO.
|
||||||
|
// This ensures fairness.
|
||||||
|
//
|
||||||
|
// The alternative would be to not reverse the list and instead
|
||||||
|
// resume waiters in the reverse order they were queued in.
|
||||||
|
// This might result in better cache locality (most recently
|
||||||
|
// suspended coroutine might still be in cache).
|
||||||
|
// It should still provide a bounded wait time as well since we
|
||||||
|
// are guaranteed to process all waiters in this list before
|
||||||
|
// looking at any waiters newly queued after this point.
|
||||||
|
// Something to consider.
|
||||||
|
do
|
||||||
|
{
|
||||||
|
auto* next = newWaiters->m_next;
|
||||||
|
newWaiters->m_next = m_waiters;
|
||||||
|
m_waiters = newWaiters;
|
||||||
|
newWaiters = next;
|
||||||
|
} while (newWaiters != nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(m_waiters != nullptr);
|
||||||
|
|
||||||
|
// Pop the next waiter off the list
|
||||||
|
auto* waiterToResume = m_waiters;
|
||||||
|
m_waiters = m_waiters->m_next;
|
||||||
|
|
||||||
|
// Push it onto the end of the list of waiters to resume
|
||||||
|
waiterToResume->m_next = nullptr;
|
||||||
|
*waitersToResumeListEnd = waiterToResume;
|
||||||
|
waitersToResumeListEnd = &waiterToResume->m_next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We've now removed 'waiterCountToResume' waiters from the list
|
||||||
|
// so we can now decrement both the waiter and set count.
|
||||||
|
//
|
||||||
|
// However, there might have been more waiters or more calls to
|
||||||
|
// set() since we last checked so we need to go around again if
|
||||||
|
// there are still waiters that are ready to resume after decrementing
|
||||||
|
// both the 'waiter count' and 'set count' by 'waiterCountToResume'.
|
||||||
|
const std::uint64_t delta =
|
||||||
|
std::uint64_t(waiterCountToResume) |
|
||||||
|
std::uint64_t(waiterCountToResume) << 32;
|
||||||
|
|
||||||
|
// Needs to be 'release' as we're releasing the lock and anyone that
|
||||||
|
// subsequently acquires the lock needs to see our prior writes to
|
||||||
|
// m_waiters.
|
||||||
|
// Needs to be 'acquire' in the case that new waiters were added so
|
||||||
|
// that we see their prior writes to 'm_newWaiters'.
|
||||||
|
const std::uint64_t newState =
|
||||||
|
m_state.fetch_sub(delta, std::memory_order_acq_rel) - delta;
|
||||||
|
|
||||||
|
waiterCountToResume = local::get_resumable_waiter_count(newState);
|
||||||
|
} while (waiterCountToResume > 0);
|
||||||
|
|
||||||
|
// Now resume all of the waiters we've dequeued.
|
||||||
|
// There should be at least one.
|
||||||
|
assert(waitersToResumeList != nullptr);
|
||||||
|
do
|
||||||
|
{
|
||||||
|
auto* const waiter = waitersToResumeList;
|
||||||
|
|
||||||
|
// Read 'next' before resuming since resuming the waiter is
|
||||||
|
// likely to destroy the waiter object.
|
||||||
|
auto* const next = waitersToResumeList->m_next;
|
||||||
|
|
||||||
|
// Decrement reference count and see if we decremented the last
|
||||||
|
// reference and if so then we are responsible for resuming.
|
||||||
|
// If not, then await_suspend() is responsible for resuming by
|
||||||
|
// returning 'false' and not suspending.
|
||||||
|
if (waiter->m_refCount.fetch_sub(1, std::memory_order_release) == 1)
|
||||||
|
{
|
||||||
|
waiter->m_awaiter.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
waitersToResumeList = next;
|
||||||
|
} while (waitersToResumeList != nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
cppcoro::async_auto_reset_event_operation::async_auto_reset_event_operation() noexcept
|
||||||
|
: m_event(nullptr)
|
||||||
|
{}
|
||||||
|
|
||||||
|
cppcoro::async_auto_reset_event_operation::async_auto_reset_event_operation(
|
||||||
|
const async_auto_reset_event& event) noexcept
|
||||||
|
: m_event(&event)
|
||||||
|
, m_refCount(2)
|
||||||
|
{}
|
||||||
|
|
||||||
|
cppcoro::async_auto_reset_event_operation::async_auto_reset_event_operation(
|
||||||
|
const async_auto_reset_event_operation& other) noexcept
|
||||||
|
: m_event(other.m_event)
|
||||||
|
, m_refCount(2)
|
||||||
|
{}
|
||||||
|
|
||||||
|
bool cppcoro::async_auto_reset_event_operation::await_suspend(
|
||||||
|
std::experimental::coroutine_handle<> awaiter) noexcept
|
||||||
|
{
|
||||||
|
m_awaiter = awaiter;
|
||||||
|
|
||||||
|
// Queue the waiter to the m_newWaiters list.
|
||||||
|
async_auto_reset_event_operation* head = m_event->m_newWaiters.load(std::memory_order_relaxed);
|
||||||
|
do
|
||||||
|
{
|
||||||
|
m_next = head;
|
||||||
|
} while (!m_event->m_newWaiters.compare_exchange_weak(
|
||||||
|
head,
|
||||||
|
this,
|
||||||
|
std::memory_order_release,
|
||||||
|
std::memory_order_relaxed));
|
||||||
|
|
||||||
|
// Increment the waiter count.
|
||||||
|
// Needs to be 'release' so that our prior write to m_newWaiters is
|
||||||
|
// visible to anyone that acquires the lock.
|
||||||
|
// Needs to be 'acquire' in case we acquired the lock so we can see
|
||||||
|
// others' writes to m_newWaiters and writes prior to set() calls.
|
||||||
|
constexpr std::uint64_t waiterIncrement = std::uint64_t(1) << 32;
|
||||||
|
const std::uint64_t oldState =
|
||||||
|
m_event->m_state.fetch_add(waiterIncrement, std::memory_order_acq_rel);
|
||||||
|
|
||||||
|
if (oldState != 0 && local::get_waiter_count(oldState) == 0)
|
||||||
|
{
|
||||||
|
// We transitioned from non-zero set and zero waiters to
|
||||||
|
// non-zero set and non-zero waiters, so we acquired the lock
|
||||||
|
// and thus responsibility for resuming waiters.
|
||||||
|
m_event->resume_waiters(oldState + waiterIncrement);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decrement the ref-count to indicate that this waiter is now safe
|
||||||
|
// to resume. We don't want it to resume while we're still accessing the
|
||||||
|
// m_event object as resuming it might cause the event object to be
|
||||||
|
// destructed.
|
||||||
|
//
|
||||||
|
// Need 'acquire' semantics here in the case that another thread has
|
||||||
|
// concurrently dequeued us and scheduled us for resumption by decrementing
|
||||||
|
// the ref-count with 'release' semantics so that we see the writes prior
|
||||||
|
// to the 'set()' call that released this waiter.
|
||||||
|
return m_refCount.fetch_sub(1, std::memory_order_acquire) != 1;
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ import cake.path
|
||||||
from cake.tools import compiler, script, env, project, variant
|
from cake.tools import compiler, script, env, project, variant
|
||||||
|
|
||||||
includes = cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', [
|
includes = cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', [
|
||||||
|
'async_auto_reset_event.hpp',
|
||||||
'async_generator.hpp',
|
'async_generator.hpp',
|
||||||
'async_mutex.hpp',
|
'async_mutex.hpp',
|
||||||
'broken_promise.hpp',
|
'broken_promise.hpp',
|
||||||
|
@ -45,6 +46,7 @@ privateHeaders = script.cwd([
|
||||||
])
|
])
|
||||||
|
|
||||||
sources = script.cwd([
|
sources = script.cwd([
|
||||||
|
'async_auto_reset_event.cpp',
|
||||||
'async_mutex.cpp',
|
'async_mutex.cpp',
|
||||||
'cancellation_state.cpp',
|
'cancellation_state.cpp',
|
||||||
'cancellation_token.cpp',
|
'cancellation_token.cpp',
|
||||||
|
|
144
test/async_auto_reset_event_tests.cpp
Normal file
144
test/async_auto_reset_event_tests.cpp
Normal file
|
@ -0,0 +1,144 @@
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Copyright (c) Lewis Baker
|
||||||
|
// Licenced under MIT license. See LICENSE.txt for details.
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include <cppcoro/async_auto_reset_event.hpp>
|
||||||
|
|
||||||
|
#include <cppcoro/task.hpp>
|
||||||
|
#include <cppcoro/io_service.hpp>
|
||||||
|
#include <cppcoro/on_scope_exit.hpp>
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
#include <cassert>
|
||||||
|
|
||||||
|
#include "doctest/doctest.h"
|
||||||
|
|
||||||
|
TEST_SUITE_BEGIN("async_auto_reset_event");
|
||||||
|
|
||||||
|
TEST_CASE("single waiter")
|
||||||
|
{
|
||||||
|
cppcoro::async_auto_reset_event event;
|
||||||
|
|
||||||
|
bool started = false;
|
||||||
|
bool finished = false;
|
||||||
|
auto run = [&]() -> cppcoro::task<>
|
||||||
|
{
|
||||||
|
started = true;
|
||||||
|
co_await event;
|
||||||
|
finished = true;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto t = run();
|
||||||
|
|
||||||
|
CHECK(started);
|
||||||
|
CHECK(!finished);
|
||||||
|
|
||||||
|
event.set();
|
||||||
|
|
||||||
|
CHECK(finished);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("multiple waiters")
|
||||||
|
{
|
||||||
|
cppcoro::async_auto_reset_event event;
|
||||||
|
|
||||||
|
auto run = [&]() -> cppcoro::task<>
|
||||||
|
{
|
||||||
|
co_await event;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto t1 = run();
|
||||||
|
auto t2 = run();
|
||||||
|
|
||||||
|
CHECK(!t1.is_ready());
|
||||||
|
CHECK(!t2.is_ready());
|
||||||
|
|
||||||
|
event.set();
|
||||||
|
|
||||||
|
CHECK(t1.is_ready());
|
||||||
|
CHECK(!t2.is_ready());
|
||||||
|
|
||||||
|
event.set();
|
||||||
|
|
||||||
|
CHECK(t1.is_ready());
|
||||||
|
CHECK(t2.is_ready());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("multi-threaded")
|
||||||
|
{
|
||||||
|
cppcoro::io_service ioService;
|
||||||
|
|
||||||
|
std::thread thread1{ [&] { ioService.process_events(); } };
|
||||||
|
auto joinOnExit1 = cppcoro::on_scope_exit([&] { thread1.join(); });
|
||||||
|
|
||||||
|
std::thread thread2{ [&] { ioService.process_events(); } };
|
||||||
|
auto joinOnExit2 = cppcoro::on_scope_exit([&] { thread2.join(); });
|
||||||
|
|
||||||
|
std::thread thread3{ [&] { ioService.process_events(); } };
|
||||||
|
auto joinOnExit3 = cppcoro::on_scope_exit([&] { thread3.join(); });
|
||||||
|
|
||||||
|
auto run = [&]() -> cppcoro::task<>
|
||||||
|
{
|
||||||
|
cppcoro::async_auto_reset_event event;
|
||||||
|
|
||||||
|
int value = 0;
|
||||||
|
|
||||||
|
auto startWaiter = [&]() -> cppcoro::task<>
|
||||||
|
{
|
||||||
|
co_await ioService.schedule();
|
||||||
|
co_await event;
|
||||||
|
++value;
|
||||||
|
event.set();
|
||||||
|
};
|
||||||
|
|
||||||
|
auto startSignaller = [&]() -> cppcoro::task<>
|
||||||
|
{
|
||||||
|
co_await ioService.schedule();
|
||||||
|
value = 5;
|
||||||
|
event.set();
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<cppcoro::task<>> waiters;
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; ++i)
|
||||||
|
{
|
||||||
|
waiters.emplace_back(startWaiter());
|
||||||
|
}
|
||||||
|
|
||||||
|
co_await startSignaller();
|
||||||
|
|
||||||
|
for (auto& waiter : waiters)
|
||||||
|
{
|
||||||
|
co_await waiter;
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: Can't use CHECK() here because it's not thread-safe
|
||||||
|
assert(value == 1005);
|
||||||
|
};
|
||||||
|
|
||||||
|
auto runMany = [&]() -> cppcoro::task<>
|
||||||
|
{
|
||||||
|
std::vector<cppcoro::task<>> tasks;
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; ++i)
|
||||||
|
{
|
||||||
|
tasks.emplace_back(run());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& t : tasks)
|
||||||
|
{
|
||||||
|
co_await t;
|
||||||
|
}
|
||||||
|
|
||||||
|
ioService.stop();
|
||||||
|
};
|
||||||
|
|
||||||
|
auto t = runMany();
|
||||||
|
|
||||||
|
joinOnExit1.call_now();
|
||||||
|
joinOnExit2.call_now();
|
||||||
|
joinOnExit3.call_now();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_SUITE_END();
|
|
@ -22,6 +22,7 @@ sources = script.cwd([
|
||||||
'recursive_generator_tests.cpp',
|
'recursive_generator_tests.cpp',
|
||||||
'async_generator_tests.cpp',
|
'async_generator_tests.cpp',
|
||||||
'async_mutex_tests.cpp',
|
'async_mutex_tests.cpp',
|
||||||
|
'async_auto_reset_event_tests.cpp',
|
||||||
'cancellation_token_tests.cpp',
|
'cancellation_token_tests.cpp',
|
||||||
'lazy_task_tests.cpp',
|
'lazy_task_tests.cpp',
|
||||||
'shared_lazy_task_tests.cpp',
|
'shared_lazy_task_tests.cpp',
|
||||||
|
|
Loading…
Reference in a new issue