Make the ring-buffer wait operations take a scheduler.

The awaiting coroutine will be resumed using the scheduler if it
is suspended waiting for a sequence number. This simplifies code
as the caller doesn't need to remember to manually reschedule.
Forcing a scheduler to provided also prevents the producer/consumer
coroutines from effectively becoming single-threaded.
This commit is contained in:
Lewis Baker 2018-10-30 23:31:36 -07:00
parent 669fabed26
commit 3cdc22bbed
8 changed files with 514 additions and 152 deletions

View file

@ -0,0 +1,120 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_DETAIL_MANUAL_LIFETIME_HPP_INCLUDED
#define CPPCORO_DETAIL_MANUAL_LIFETIME_HPP_INCLUDED
#include <type_traits>
#include <memory>
namespace cppcoro::detail
{
template<typename T>
struct manual_lifetime
{
public:
manual_lifetime() noexcept {}
~manual_lifetime() noexcept {}
manual_lifetime(const manual_lifetime&) = delete;
manual_lifetime(manual_lifetime&&) = delete;
manual_lifetime& operator=(const manual_lifetime&) = delete;
manual_lifetime& operator=(manual_lifetime&&) = delete;
template<typename... Args>
std::enable_if_t<std::is_constructible_v<T, Args&&...>> construct(Args&&... args)
noexcept(std::is_nothrow_constructible_v<T, Args&&...>)
{
::new (static_cast<void*>(std::addressof(m_value))) T(static_cast<Args&&>(args)...);
}
void destruct() noexcept(std::is_nothrow_destructible_v<T>)
{
m_value.~T();
}
std::add_pointer_t<T> operator->() noexcept { return std::addressof(**this); }
std::add_pointer_t<const T> operator->() const noexcept { return std::addressof(**this); }
T& operator*() & noexcept { return m_value; }
const T& operator*() const & noexcept { return m_value; }
T&& operator*() && noexcept { return static_cast<T&&>(m_value); }
const T&& operator*() const && noexcept { return static_cast<const T&&>(m_value); }
private:
union {
T m_value;
};
};
template<typename T>
struct manual_lifetime<T&>
{
public:
manual_lifetime() noexcept {}
~manual_lifetime() noexcept {}
manual_lifetime(const manual_lifetime&) = delete;
manual_lifetime(manual_lifetime&&) = delete;
manual_lifetime& operator=(const manual_lifetime&) = delete;
manual_lifetime& operator=(manual_lifetime&&) = delete;
void construct(T& value) noexcept
{
m_value = std::addressof(value);
}
void destruct() noexcept {}
T* operator->() noexcept { return m_value; }
const T* operator->() const noexcept { return m_value; }
T& operator*() noexcept { return *m_value; }
const T& operator*() const noexcept { return *m_value; }
private:
T* m_value;
};
template<typename T>
struct manual_lifetime<T&&>
{
public:
manual_lifetime() noexcept {}
~manual_lifetime() noexcept {}
manual_lifetime(const manual_lifetime&) = delete;
manual_lifetime(manual_lifetime&&) = delete;
manual_lifetime& operator=(const manual_lifetime&) = delete;
manual_lifetime& operator=(manual_lifetime&&) = delete;
void construct(T&& value) noexcept
{
m_value = std::addressof(value);
}
void destruct() noexcept {}
T* operator->() noexcept { return m_value; }
const T* operator->() const noexcept { return m_value; }
T& operator*() & noexcept { return *m_value; }
const T& operator*() const & noexcept { return *m_value; }
T&& operator*() && noexcept { return static_cast<T&&>(*m_value); }
const T&& operator*() const && noexcept { return static_cast<const T&&>(*m_value); }
private:
T* m_value;
};
template<>
struct manual_lifetime<void>
{
void construct() noexcept {}
void destruct() noexcept {}
void operator*() const noexcept {}
};
}
#endif

View file

@ -0,0 +1,25 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_INLINE_SCHEDULER_HPP_INCLUDED
#define CPPCORO_INLINE_SCHEDULER_HPP_INCLUDED
#include <experimental/coroutine>
namespace cppcoro
{
class inline_scheduler
{
public:
inline_scheduler() noexcept = default;
std::experimental::suspend_never schedule() const noexcept
{
return {};
}
};
}
#endif

View file

@ -10,19 +10,24 @@
#include <cppcoro/sequence_range.hpp>
#include <cppcoro/sequence_traits.hpp>
#include <cppcoro/detail/manual_lifetime.hpp>
#include <atomic>
#include <cstdint>
#include <cassert>
namespace cppcoro
{
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_one_operation;
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_operation;
template<typename SEQUENCE, typename TRAITS>
class multi_producer_sequencer_wait_operation_base;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_wait_operation;
/// A multi-producer sequencer is a thread-synchronisation primitive that can be
@ -72,9 +77,11 @@ namespace cppcoro
/// Returns an awaitable type that when co_awaited will suspend the awaiting
/// coroutine until the specified 'targetSequence' number and all prior sequence
/// numbers have been published.
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS> wait_until_published(
template<typename SCHEDULER>
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished) const noexcept;
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept;
/// Query if there are currently any slots available for claiming.
///
@ -92,7 +99,9 @@ namespace cppcoro
/// slot within the ring buffer. Once the value has been initialised the item
/// must be published by calling the .publish() method, passing the sequence
/// number.
multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS> claim_one() noexcept;
template<typename SCHEDULER>
multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
claim_one(SCHEDULER& scheduler) noexcept;
/// Claim a contiguous range of sequence numbers corresponding to slots within
/// a ring-buffer.
@ -106,7 +115,9 @@ namespace cppcoro
///
/// The caller is responsible for ensuring that they publish every element of the
/// returned sequence range by calling .publish().
multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS> claim_up_to(std::size_t count) noexcept;
template<typename SCHEDULER>
multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept;
/// Publish the element with the specified sequence number, making it available
/// to consumers.
@ -131,12 +142,17 @@ namespace cppcoro
private:
friend class multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>;
friend class multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS>;
friend class multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>;
template<typename SEQUENCE, typename TRAITS>
friend class multi_producer_sequencer_wait_operation_base;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
friend class multi_producer_sequencer_claim_operation;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
friend class multi_producer_sequencer_claim_one_operation;
void resume_ready_awaiters() noexcept;
void add_awaiter(multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>* awaiter) const noexcept;
void add_awaiter(multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept;
#if CPPCORO_COMPILER_MSVC
# pragma warning(push)
@ -151,7 +167,7 @@ namespace cppcoro
std::atomic<SEQUENCE> m_nextToClaim;
alignas(CPPCORO_CPU_CACHE_LINE)
mutable std::atomic<multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>*> m_awaiters;
mutable std::atomic<multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>*> m_awaiters;
#if CPPCORO_COMPILER_MSVC
# pragma warning(pop)
@ -159,7 +175,7 @@ namespace cppcoro
};
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_awaiter
{
public:
@ -167,8 +183,9 @@ namespace cppcoro
multi_producer_sequencer_claim_awaiter(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
const sequence_range<SEQUENCE, TRAITS>& claimedRange) noexcept
: m_barrierWait(consumerBarrier, claimedRange.back() - bufferSize)
const sequence_range<SEQUENCE, TRAITS>& claimedRange,
SCHEDULER& scheduler) noexcept
: m_barrierWait(consumerBarrier, claimedRange.back() - bufferSize, scheduler)
, m_claimedRange(claimedRange)
{}
@ -189,25 +206,27 @@ namespace cppcoro
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS> m_barrierWait;
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_barrierWait;
sequence_range<SEQUENCE, TRAITS> m_claimedRange;
};
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_operation
{
public:
multi_producer_sequencer_claim_operation(
multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
std::size_t count) noexcept
std::size_t count,
SCHEDULER& scheduler) noexcept
: m_sequencer(sequencer)
, m_count(count < sequencer.buffer_size() ? count : sequencer.buffer_size())
, m_scheduler(scheduler)
{
}
multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS> operator co_await() noexcept
multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
{
// We wait until the awaitable is actually co_await'ed before we claim the
// range of elements. If we claimed them earlier, then it may be possible for
@ -219,10 +238,11 @@ namespace cppcoro
// m_count elements are available. This would complicate the logic here somewhat
// as we'd need to use a compare-exchange instead.
const SEQUENCE first = m_sequencer.m_nextToClaim.fetch_add(m_count, std::memory_order_relaxed);
return multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS>{
return multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
m_sequencer.m_consumerBarrier,
m_sequencer.buffer_size(),
sequence_range<SEQUENCE, TRAITS>{ first, first + m_count }
sequence_range<SEQUENCE, TRAITS>{ first, first + m_count },
m_scheduler
};
}
@ -230,10 +250,11 @@ namespace cppcoro
multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
std::size_t m_count;
SCHEDULER& m_scheduler;
};
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_one_awaiter
{
public:
@ -241,8 +262,9 @@ namespace cppcoro
multi_producer_sequencer_claim_one_awaiter(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
SEQUENCE claimedSequence) noexcept
: m_waitOp(consumerBarrier, claimedSequence - bufferSize)
SEQUENCE claimedSequence,
SCHEDULER& scheduler) noexcept
: m_waitOp(consumerBarrier, claimedSequence - bufferSize, scheduler)
, m_claimedSequence(claimedSequence)
{}
@ -263,42 +285,46 @@ namespace cppcoro
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS> m_waitOp;
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_waitOp;
SEQUENCE m_claimedSequence;
};
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_one_operation
{
public:
multi_producer_sequencer_claim_one_operation(
multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer) noexcept
multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SCHEDULER& scheduler) noexcept
: m_sequencer(sequencer)
, m_scheduler(scheduler)
{}
multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS> operator co_await() noexcept
multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
{
return multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS>{
return multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
m_sequencer.m_consumerBarrier,
m_sequencer.buffer_size(),
m_sequencer.m_nextToClaim.fetch_add(1, std::memory_order_relaxed)
m_sequencer.m_nextToClaim.fetch_add(1, std::memory_order_relaxed),
m_scheduler
};
}
private:
multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
SCHEDULER& m_scheduler;
};
template<typename SEQUENCE, typename TRAITS>
class multi_producer_sequencer_wait_operation
class multi_producer_sequencer_wait_operation_base
{
public:
multi_producer_sequencer_wait_operation(
multi_producer_sequencer_wait_operation_base(
const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished) noexcept
@ -308,8 +334,8 @@ namespace cppcoro
, m_readyToResume(false)
{}
multi_producer_sequencer_wait_operation(
const multi_producer_sequencer_wait_operation& other) noexcept
multi_producer_sequencer_wait_operation_base(
const multi_producer_sequencer_wait_operation_base& other) noexcept
: m_sequencer(other.m_sequencer)
, m_targetSequence(other.m_targetSequence)
, m_lastKnownPublished(other.m_lastKnownPublished)
@ -339,7 +365,7 @@ namespace cppcoro
return m_lastKnownPublished;
}
private:
protected:
friend class multi_producer_sequencer<SEQUENCE, TRAITS>;
@ -348,18 +374,121 @@ namespace cppcoro
m_lastKnownPublished = lastKnownPublished;
if (m_readyToResume.exchange(true, std::memory_order_release))
{
m_awaitingCoroutine.resume();
resume_impl();
}
}
virtual void resume_impl() noexcept = 0;
const multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
SEQUENCE m_targetSequence;
SEQUENCE m_lastKnownPublished;
multi_producer_sequencer_wait_operation* m_next;
multi_producer_sequencer_wait_operation_base* m_next;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
std::atomic<bool> m_readyToResume;
};
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_wait_operation :
public multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>
{
using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
public:
multi_producer_sequencer_wait_operation(
const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) noexcept
: multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(sequencer, targetSequence, lastKnownPublished)
, m_scheduler(scheduler)
{}
multi_producer_sequencer_wait_operation(
const multi_producer_sequencer_wait_operation& other) noexcept
: multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(other)
, m_scheduler(other.m_scheduler)
{}
~multi_producer_sequencer_wait_operation()
{
if (m_isScheduleAwaiterCreated)
{
m_scheduleAwaiter.destruct();
}
if (m_isScheduleOperationCreated)
{
m_scheduleOperation.destruct();
}
}
SEQUENCE await_resume() noexcept(noexcept(m_scheduleOperation->await_resume()))
{
if (m_isScheduleOperationCreated)
{
m_scheduleOperation->await_resume();
}
return multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
}
private:
void resume_impl() noexcept override
{
try
{
m_scheduleOperation.construct(m_scheduler.schedule());
m_isScheduleOperationCreated = true;
m_scheduleAwaiter.construct(detail::get_awaiter(
static_cast<schedule_operation&&>(*m_scheduleOperation)));
m_isScheduleAwaiterCreated = true;
if (!m_scheduleAwaiter->await_ready())
{
using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(m_awaitingCoroutine));
if constexpr (std::is_void_v<await_suspend_result_t>)
{
m_scheduleAwaiter->await_suspend(m_awaitingCoroutine);
return;
}
else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
{
if (m_scheduleAwaiter->await_suspend(m_awaitingCoroutine))
{
return;
}
}
else
{
// Assume it returns a coroutine_handle.
m_scheduleAwaiter->await_suspend(m_awaitingCoroutine).resume();
return;
}
}
}
catch (...)
{
// Ignore failure to reschedule and resume inline?
// Should we catch the exception and rethrow from await_resume()?
// Or should we require that 'co_await scheduler.schedule()' is noexcept?
}
// Resume outside the catch-block.
m_awaitingCoroutine.resume();
}
SCHEDULER& m_scheduler;
// Can't use std::optional<T> here since T could be a reference.
detail::manual_lifetime<schedule_operation> m_scheduleOperation;
detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
bool m_isScheduleOperationCreated = false;
bool m_isScheduleAwaiterCreated = false;
};
template<typename SEQUENCE, typename TRAITS>
multi_producer_sequencer<SEQUENCE, TRAITS>::multi_producer_sequencer(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
@ -400,13 +529,15 @@ namespace cppcoro
}
template<typename SEQUENCE, typename TRAITS>
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>
template<typename SCHEDULER>
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>
multi_producer_sequencer<SEQUENCE, TRAITS>::wait_until_published(
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished) const noexcept
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept
{
return multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>{
*this, targetSequence, lastKnownPublished
return multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>{
*this, targetSequence, lastKnownPublished, scheduler
};
}
@ -419,24 +550,24 @@ namespace cppcoro
}
template<typename SEQUENCE, typename TRAITS>
multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>
multi_producer_sequencer<SEQUENCE, TRAITS>::claim_one() noexcept
template<typename SCHEDULER>
multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
multi_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
{
return multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>{ *this };
return multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
}
template<typename SEQUENCE, typename TRAITS>
multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS>
multi_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count) noexcept
template<typename SCHEDULER>
multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
multi_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
{
return multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS>{ *this, count };
return multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, count, scheduler };
}
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(SEQUENCE sequence) noexcept
{
using awaiter_t = multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>;
m_published[sequence & m_sequenceMask].store(sequence, std::memory_order_seq_cst);
// Resume any waiters that might have been satisfied by this publish operation.
@ -469,7 +600,7 @@ namespace cppcoro
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::resume_ready_awaiters() noexcept
{
using awaiter_t = multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>;
using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
awaiter_t* awaiters = m_awaiters.load(std::memory_order_seq_cst);
if (awaiters == nullptr)
@ -584,9 +715,9 @@ namespace cppcoro
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::add_awaiter(
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>* awaiter) const noexcept
multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept
{
using awaiter_t = multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS>;
using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
SEQUENCE targetSequence = awaiter->m_targetSequence;
SEQUENCE lastKnownPublished = awaiter->m_lastKnownPublished;

View file

@ -8,6 +8,7 @@
#include <cppcoro/config.hpp>
#include <cppcoro/awaitable_traits.hpp>
#include <cppcoro/sequence_traits.hpp>
#include <cppcoro/detail/manual_lifetime.hpp>
#include <atomic>
#include <cassert>
@ -19,6 +20,9 @@
namespace cppcoro
{
template<typename SEQUENCE, typename TRAITS>
class sequence_barrier_wait_operation_base;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class sequence_barrier_wait_operation;
/// A sequence barrier is a synchornisation primitive that allows a single-producer
@ -43,7 +47,7 @@ namespace cppcoro
std::is_integral_v<SEQUENCE>,
"sequence_barrier requires an integral sequence type");
using awaiter_t = sequence_barrier_wait_operation<SEQUENCE, TRAITS>;
using awaiter_t = sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>;
public:
@ -87,9 +91,11 @@ namespace cppcoro
/// number. This is guaranteed not to precede \p targetSequence but may be a sequence
/// number after \p targetSequence, which indicates that more elements have been
/// published than you were waiting for.
template<typename SCHEDULER>
[[nodiscard]]
sequence_barrier_wait_operation<SEQUENCE, TRAITS> wait_until_published(
SEQUENCE targetSequence) const noexcept;
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept;
/// Publish the specified sequence number to consumers.
///
@ -105,7 +111,7 @@ namespace cppcoro
private:
friend class sequence_barrier_wait_operation<SEQUENCE, TRAITS>;
friend class sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>;
void add_awaiter(awaiter_t* awaiter) const noexcept;
@ -129,11 +135,11 @@ namespace cppcoro
};
template<typename SEQUENCE, typename TRAITS>
class sequence_barrier_wait_operation
class sequence_barrier_wait_operation_base
{
public:
sequence_barrier_wait_operation(
explicit sequence_barrier_wait_operation_base(
const sequence_barrier<SEQUENCE, TRAITS>& barrier,
SEQUENCE targetSequence) noexcept
: m_barrier(barrier)
@ -142,8 +148,8 @@ namespace cppcoro
, m_readyToResume(false)
{}
sequence_barrier_wait_operation(
const sequence_barrier_wait_operation& other) noexcept
sequence_barrier_wait_operation_base(
const sequence_barrier_wait_operation_base& other) noexcept
: m_barrier(other.m_barrier)
, m_targetSequence(other.m_targetSequence)
, m_lastKnownPublished(other.m_lastKnownPublished)
@ -180,26 +186,122 @@ namespace cppcoro
}
}
virtual void resume_impl() noexcept
{
return m_awaitingCoroutine.resume();
}
virtual void resume_impl() noexcept = 0;
const sequence_barrier<SEQUENCE, TRAITS>& m_barrier;
const SEQUENCE m_targetSequence;
SEQUENCE m_lastKnownPublished;
sequence_barrier_wait_operation* m_next;
sequence_barrier_wait_operation_base* m_next;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
std::atomic<bool> m_readyToResume;
};
template<typename SEQUENCE, typename TRAITS>
[[nodiscard]]
sequence_barrier_wait_operation<SEQUENCE, TRAITS> sequence_barrier<SEQUENCE, TRAITS>::wait_until_published(
SEQUENCE targetSequence) const noexcept
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class sequence_barrier_wait_operation : public sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>
{
return sequence_barrier_wait_operation<SEQUENCE, TRAITS>{ *this, targetSequence };
using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
public:
sequence_barrier_wait_operation(
const sequence_barrier<SEQUENCE, TRAITS>& barrier,
SEQUENCE targetSequence,
SCHEDULER& scheduler) noexcept
: sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>(barrier, targetSequence)
, m_scheduler(scheduler)
{}
sequence_barrier_wait_operation(
const sequence_barrier_wait_operation& other) noexcept
: sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>(other)
, m_scheduler(other.m_scheduler)
{}
~sequence_barrier_wait_operation()
{
if (m_isScheduleAwaiterCreated)
{
m_scheduleAwaiter.destruct();
}
if (m_isScheduleOperationCreated)
{
m_scheduleOperation.destruct();
}
}
decltype(auto) await_resume() noexcept(noexcept(m_scheduleAwaiter->await_resume()))
{
if (m_isScheduleAwaiterCreated)
{
m_scheduleAwaiter->await_resume();
}
return sequence_barrier_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
}
private:
void resume_impl() noexcept override
{
try
{
m_scheduleOperation.construct(m_scheduler.schedule());
m_isScheduleOperationCreated = true;
m_scheduleAwaiter.construct(detail::get_awaiter(
static_cast<schedule_operation&&>(*m_scheduleOperation)));
m_isScheduleAwaiterCreated = true;
if (!m_scheduleAwaiter->await_ready())
{
using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(m_awaitingCoroutine));
if constexpr (std::is_void_v<await_suspend_result_t>)
{
m_scheduleAwaiter->await_suspend(m_awaitingCoroutine);
return;
}
else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
{
if (m_scheduleAwaiter->await_suspend(m_awaitingCoroutine))
{
return;
}
}
else
{
// Assume it returns a coroutine_handle.
m_scheduleAwaiter->await_suspend(m_awaitingCoroutine).resume();
return;
}
}
}
catch (...)
{
// Ignore failure to reschedule and resume inline?
// Should we catch the exception and rethrow from await_resume()?
// Or should we require that 'co_await scheduler.schedule()' is noexcept?
}
// Resume outside the catch-block.
m_awaitingCoroutine.resume();
}
SCHEDULER& m_scheduler;
// Can't use std::optional<T> here since T could be a reference.
detail::manual_lifetime<schedule_operation> m_scheduleOperation;
detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
bool m_isScheduleOperationCreated = false;
bool m_isScheduleAwaiterCreated = false;
};
template<typename SEQUENCE, typename TRAITS>
template<typename SCHEDULER>
[[nodiscard]]
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> sequence_barrier<SEQUENCE, TRAITS>::wait_until_published(
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept
{
return sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER>(*this, targetSequence, scheduler);
}
template<typename SEQUENCE, typename TRAITS>

View file

@ -11,10 +11,10 @@
namespace cppcoro
{
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class single_producer_sequencer_claim_one_operation;
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class single_producer_sequencer_claim_operation;
template<
@ -44,8 +44,10 @@ namespace cppcoro
/// co_await expression will be the sequence number of the slot.
/// The caller must publish() the claimed sequence number once they have written to
/// the ring-buffer.
template<typename SCHEDULER>
[[nodiscard]]
single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS> claim_one() noexcept;
single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
claim_one(SCHEDULER& scheduler) noexcept;
/// Claim one or more contiguous slots in the ring-buffer.
///
@ -61,8 +63,10 @@ namespace cppcoro
/// the range of sequence numbers that were claimed. Once you have written element values
/// to all of the claimed slots you must publish() the sequence range in order to make
/// the elements available to consumers.
template<typename SCHEDULER>
[[nodiscard]]
single_producer_sequencer_claim_operation<SEQUENCE, TRAITS> claim_up_to(std::size_t count) noexcept;
single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER> claim_up_to(
std::size_t count, SCHEDULER& scheduler) noexcept;
/// Publish the specified sequence number.
///
@ -103,16 +107,20 @@ namespace cppcoro
/// last-published sequence number, which is guaranteed to be at least 'seq' but may be some
/// subsequent sequence number if additional items were published while waiting for the
/// the requested sequence number to be published.
template<typename SCHEDULER>
[[nodiscard]]
auto wait_until_published(SEQUENCE targetSequence) const noexcept
auto wait_until_published(SEQUENCE targetSequence, SCHEDULER& scheduler) const noexcept
{
return m_producerBarrier.wait_until_published(targetSequence);
return m_producerBarrier.wait_until_published(targetSequence, scheduler);
}
private:
friend class single_producer_sequencer_claim_operation<SEQUENCE, TRAITS>;
friend class single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
friend class single_producer_sequencer_claim_operation;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
friend class single_producer_sequencer_claim_one_operation;
#if CPPCORO_COMPILER_MSVC
# pragma warning(push)
@ -132,14 +140,18 @@ namespace cppcoro
#endif
};
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class single_producer_sequencer_claim_one_operation
{
public:
single_producer_sequencer_claim_one_operation(
single_producer_sequencer<SEQUENCE, TRAITS>& sequencer) noexcept
: m_consumerWaitOperation(sequencer.m_consumerBarrier, static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize))
single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SCHEDULER& scheduler) noexcept
: m_consumerWaitOperation(
sequencer.m_consumerBarrier,
static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize),
scheduler)
, m_sequencer(sequencer)
{}
@ -160,20 +172,24 @@ namespace cppcoro
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS> m_consumerWaitOperation;
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_consumerWaitOperation;
single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
};
template<typename SEQUENCE, typename TRAITS>
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class single_producer_sequencer_claim_operation
{
public:
single_producer_sequencer_claim_operation(
explicit single_producer_sequencer_claim_operation(
single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
std::size_t count) noexcept
: m_consumerWaitOperation(sequencer.m_consumerBarrier, static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize))
std::size_t count,
SCHEDULER& scheduler) noexcept
: m_consumerWaitOperation(
sequencer.m_consumerBarrier,
static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize),
scheduler)
, m_sequencer(sequencer)
, m_count(count)
{}
@ -202,26 +218,28 @@ namespace cppcoro
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS> m_consumerWaitOperation;
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_consumerWaitOperation;
single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
std::size_t m_count;
};
template<typename SEQUENCE, typename TRAITS>
template<typename SCHEDULER>
[[nodiscard]]
single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>
single_producer_sequencer<SEQUENCE, TRAITS>::claim_one() noexcept
single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
single_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
{
return single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>{ *this };
return single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
}
template<typename SEQUENCE, typename TRAITS>
template<typename SCHEDULER>
[[nodiscard]]
single_producer_sequencer_claim_operation<SEQUENCE, TRAITS>
single_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count) noexcept
single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
single_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
{
return single_producer_sequencer_claim_operation<SEQUENCE, TRAITS>(*this, count);
return single_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>(*this, count, scheduler);
}
}

View file

@ -42,17 +42,12 @@ namespace
std::uint64_t i = 0;
while (i < iterationCount)
{
const bool reschedule = !sequencer.any_available();
auto seq = co_await sequencer.claim_one();
if (reschedule)
{
co_await tp.schedule();
}
auto seq = co_await sequencer.claim_one(tp);
buffer[seq & mask] = ++i;
sequencer.publish(seq);
}
auto finalSeq = co_await sequencer.claim_one();
auto finalSeq = co_await sequencer.claim_one(tp);
buffer[finalSeq & mask] = 0;
sequencer.publish(finalSeq);
}
@ -64,8 +59,6 @@ namespace
std::uint64_t iterationCount,
std::size_t maxBatchSize)
{
co_await tp.schedule();
const std::size_t bufferSize = sequencer.buffer_size();
std::uint64_t i = 0;
@ -73,12 +66,7 @@ namespace
{
const std::size_t batchSize = static_cast<std::size_t>(
std::min<std::uint64_t>(maxBatchSize, iterationCount - i));
const bool reschedule = !sequencer.any_available();
auto sequences = co_await sequencer.claim_up_to(batchSize);
if (reschedule)
{
co_await tp.schedule();
}
auto sequences = co_await sequencer.claim_up_to(batchSize, tp);
for (auto seq : sequences)
{
buffer[seq % bufferSize] = ++i;
@ -86,7 +74,7 @@ namespace
sequencer.publish(sequences);
}
auto finalSeq = co_await sequencer.claim_one();
auto finalSeq = co_await sequencer.claim_one(tp);
buffer[finalSeq % bufferSize] = 0;
sequencer.publish(finalSeq);
}
@ -108,13 +96,7 @@ namespace
std::size_t nextToRead = 0;
do
{
std::size_t available = sequencer.last_published_after(nextToRead - 1);
if (sequence_traits<std::size_t>::precedes(available, nextToRead))
{
available = co_await sequencer.wait_until_published(nextToRead, nextToRead - 1);
co_await tp.schedule();
}
std::size_t available = co_await sequencer.wait_until_published(nextToRead, nextToRead - 1, tp);
do
{
const auto& value = buffer[nextToRead & mask];

View file

@ -10,6 +10,7 @@
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/when_all.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/inline_scheduler.hpp>
#include <stdio.h>
#include <thread>
@ -36,6 +37,8 @@ DOCTEST_TEST_CASE("constructing with initial sequence number")
DOCTEST_TEST_CASE("wait_until_published single-threaded")
{
inline_scheduler scheduler;
sequence_barrier<std::uint32_t> barrier;
bool reachedA = false;
bool reachedB = false;
@ -46,17 +49,17 @@ DOCTEST_TEST_CASE("wait_until_published single-threaded")
sync_wait(when_all(
[&]() -> task<>
{
CHECK(co_await barrier.wait_until_published(0) == 0);
CHECK(co_await barrier.wait_until_published(0, scheduler) == 0);
reachedA = true;
CHECK(co_await barrier.wait_until_published(1) == 1);
CHECK(co_await barrier.wait_until_published(1, scheduler) == 1);
reachedB = true;
CHECK(co_await barrier.wait_until_published(3) == 3);
CHECK(co_await barrier.wait_until_published(3, scheduler) == 3);
reachedC = true;
CHECK(co_await barrier.wait_until_published(4) == 10);
CHECK(co_await barrier.wait_until_published(4, scheduler) == 10);
reachedD = true;
co_await barrier.wait_until_published(5);
co_await barrier.wait_until_published(5, scheduler);
reachedE = true;
co_await barrier.wait_until_published(10);
co_await barrier.wait_until_published(10, scheduler);
reachedF = true;
}(),
[&]() -> task<>
@ -84,6 +87,8 @@ DOCTEST_TEST_CASE("wait_until_published single-threaded")
DOCTEST_TEST_CASE("wait_until_published multiple awaiters")
{
inline_scheduler scheduler;
sequence_barrier<std::uint32_t> barrier;
bool reachedA = false;
bool reachedB = false;
@ -93,18 +98,18 @@ DOCTEST_TEST_CASE("wait_until_published multiple awaiters")
sync_wait(when_all(
[&]() -> task<>
{
CHECK(co_await barrier.wait_until_published(0) == 0);
CHECK(co_await barrier.wait_until_published(0, scheduler) == 0);
reachedA = true;
CHECK(co_await barrier.wait_until_published(1) == 1);
CHECK(co_await barrier.wait_until_published(1, scheduler) == 1);
reachedB = true;
CHECK(co_await barrier.wait_until_published(3) == 3);
CHECK(co_await barrier.wait_until_published(3, scheduler) == 3);
reachedC = true;
}(),
[&]() -> task<>
{
CHECK(co_await barrier.wait_until_published(0) == 0);
CHECK(co_await barrier.wait_until_published(0, scheduler) == 0);
reachedD = true;
CHECK(co_await barrier.wait_until_published(3) == 3);
CHECK(co_await barrier.wait_until_published(3, scheduler) == 3);
reachedE = true;
}(),
[&]() -> task<>
@ -148,21 +153,13 @@ DOCTEST_TEST_CASE("multi-threaded usage single consumer")
[&]() -> task<std::uint64_t>
{
// Consumer
co_await tp.schedule();
std::uint64_t sum = 0;
bool reachedEnd = false;
std::size_t nextToRead = 0;
do
{
std::size_t available = writeBarrier.last_published();
if (sequence_traits<std::size_t>::precedes(available, nextToRead))
{
available = co_await writeBarrier.wait_until_published(nextToRead);
co_await tp.schedule();
}
std::size_t available = co_await writeBarrier.wait_until_published(nextToRead, tp);
do
{
sum += buffer[nextToRead % bufferSize];
@ -180,15 +177,12 @@ DOCTEST_TEST_CASE("multi-threaded usage single consumer")
[&]() -> task<>
{
// Producer
co_await tp.schedule();
std::size_t available = readBarrier.last_published() + bufferSize;
for (std::size_t nextToWrite = 0; nextToWrite <= iterationCount; ++nextToWrite)
{
if (sequence_traits<std::size_t>::precedes(available, nextToWrite))
{
available = co_await readBarrier.wait_until_published(nextToWrite - bufferSize) + bufferSize;
co_await tp.schedule();
available = co_await readBarrier.wait_until_published(nextToWrite - bufferSize, tp) + bufferSize;
}
if (nextToWrite == iterationCount)

View file

@ -40,21 +40,13 @@ DOCTEST_TEST_CASE("multi-threaded usage single consumer")
[&]() -> task<std::uint64_t>
{
// Consumer
co_await tp.schedule();
std::uint64_t sum = 0;
bool reachedEnd = false;
std::size_t nextToRead = 0;
do
{
std::size_t available = sequencer.last_published();
if (sequence_traits<std::size_t>::precedes(available, nextToRead))
{
available = co_await sequencer.wait_until_published(nextToRead);
co_await tp.schedule();
}
const std::size_t available = co_await sequencer.wait_until_published(nextToRead, tp);
do
{
sum += buffer[nextToRead % bufferSize];
@ -72,15 +64,13 @@ DOCTEST_TEST_CASE("multi-threaded usage single consumer")
[&]() -> task<>
{
// Producer
co_await tp.schedule();
constexpr std::size_t maxBatchSize = 10;
std::size_t i = 0;
while (i < iterationCount)
{
const std::size_t batchSize = std::min(maxBatchSize, iterationCount - i);
auto sequences = co_await sequencer.claim_up_to(batchSize);
auto sequences = co_await sequencer.claim_up_to(batchSize, tp);
for (auto seq : sequences)
{
buffer[seq % bufferSize] = ++i;
@ -88,7 +78,7 @@ DOCTEST_TEST_CASE("multi-threaded usage single consumer")
sequencer.publish(sequences.back());
}
auto finalSeq = co_await sequencer.claim_one();
auto finalSeq = co_await sequencer.claim_one(tp);
buffer[finalSeq % bufferSize] = 0;
sequencer.publish(finalSeq);
}()));