cppcoro/include/cppcoro/multi_producer_sequencer.hpp

830 lines
28 KiB
C++

///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_MULTI_PRODUCER_SEQUENCER_HPP_INCLUDED
#define CPPCORO_MULTI_PRODUCER_SEQUENCER_HPP_INCLUDED
#include <cppcoro/config.hpp>
#include <cppcoro/sequence_barrier.hpp>
#include <cppcoro/sequence_range.hpp>
#include <cppcoro/sequence_traits.hpp>
#include <cppcoro/detail/manual_lifetime.hpp>
#include <atomic>
#include <cstdint>
#include <cassert>
namespace cppcoro
{
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_one_operation;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_operation;
template<typename SEQUENCE, typename TRAITS>
class multi_producer_sequencer_wait_operation_base;
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_wait_operation;
/// A multi-producer sequencer is a thread-synchronisation primitive that can be
/// used to synchronise access to a ring-buffer of power-of-two size where you
/// have multiple producers concurrently claiming slots in the ring-buffer and
/// publishing items.
///
/// When a writer wants to write to a slot in the buffer it first atomically
/// increments a counter by the number of slots it wishes to allocate.
/// It then waits until all of those slots have become available and then
/// returns the range of sequence numbers allocated back to the caller.
/// The caller then writes to those slots and when done publishes them by
/// writing the sequence numbers published to each of the slots to the
/// corresponding element of an array of equal size to the ring buffer.
/// When a reader wants to check if the next sequence number is available
/// it then simply needs to read from the corresponding slot in this array
/// to check if the value stored there is equal to the sequence number it
/// is wanting to read.
///
/// This means concurrent writers are wait-free when there is space available
/// in the ring buffer, requiring a single atomic fetch-add operation as the
/// only contended write operation. All other writes are to memory locations
/// owned by a particular writer. Concurrent writers can publish items out of
/// order so that one writer does not hold up other writers until the ring
/// buffer fills up.
template<
typename SEQUENCE = std::size_t,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence);
/// The size of the circular buffer. This will be a power-of-two.
std::size_t buffer_size() const noexcept { return m_sequenceMask + 1; }
/// Lookup the last-known-published sequence number after the specified
/// sequence number.
SEQUENCE last_published_after(SEQUENCE lastKnownPublished) const noexcept;
/// Wait until the specified target sequence number has been published.
///
/// Returns an awaitable type that when co_awaited will suspend the awaiting
/// coroutine until the specified 'targetSequence' number and all prior sequence
/// numbers have been published.
template<typename SCHEDULER>
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER> wait_until_published(
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept;
/// Query if there are currently any slots available for claiming.
///
/// Note that this return-value is only approximate if you have multiple producers
/// since immediately after returning true another thread may have claimed the
/// last available slot.
bool any_available() const noexcept;
/// Claim a single slot in the buffer and wait until that slot becomes available.
///
/// Returns an Awaitable type that yields the sequence number of the slot that
/// was claimed.
///
/// Once the producer has claimed a slot then they are free to write to that
/// slot within the ring buffer. Once the value has been initialised the item
/// must be published by calling the .publish() method, passing the sequence
/// number.
template<typename SCHEDULER>
multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
claim_one(SCHEDULER& scheduler) noexcept;
/// Claim a contiguous range of sequence numbers corresponding to slots within
/// a ring-buffer.
///
/// This will claim at most the specified count of sequence numbers but may claim
/// fewer if there are only fewer entries available in the buffer. But will claim
/// at least one sequence number.
///
/// Returns an awaitable that will yield a sequence_range object containing the
/// sequence numbers that were claimed.
///
/// The caller is responsible for ensuring that they publish every element of the
/// returned sequence range by calling .publish().
template<typename SCHEDULER>
multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept;
/// Publish the element with the specified sequence number, making it available
/// to consumers.
///
/// Note that different sequence numbers may be published by different producer
/// threads out of order. A sequence number will not become available to consumers
/// until all preceding sequence numbers have also been published.
///
/// \param sequence
/// The sequence number of the elemnt to publish
/// This sequence number must have been previously acquired via a call to 'claim_one()'
/// or 'claim_up_to()'.
void publish(SEQUENCE sequence) noexcept;
/// Publish a contiguous range of sequence numbers, making each of them available
/// to consumers.
///
/// This is equivalent to calling publish(seq) for each sequence number, seq, in
/// the specified range, but is more efficient since it only checks to see if
/// there are coroutines that need to be woken up once.
void publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept;
private:
template<typename SEQUENCE2, typename TRAITS2>
friend class multi_producer_sequencer_wait_operation_base;
template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
friend class multi_producer_sequencer_claim_operation;
template<typename SEQUENCE2, typename TRAITS2, typename SCHEDULER>
friend class multi_producer_sequencer_claim_one_operation;
void resume_ready_awaiters() noexcept;
void add_awaiter(multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept;
#if CPPCORO_COMPILER_MSVC
# pragma warning(push)
# pragma warning(disable : 4324) // C4324: structure was padded due to alignment specifier
#endif
const sequence_barrier<SEQUENCE, TRAITS>& m_consumerBarrier;
const std::size_t m_sequenceMask;
const std::unique_ptr<std::atomic<SEQUENCE>[]> m_published;
alignas(CPPCORO_CPU_CACHE_LINE)
std::atomic<SEQUENCE> m_nextToClaim;
alignas(CPPCORO_CPU_CACHE_LINE)
mutable std::atomic<multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>*> m_awaiters;
#if CPPCORO_COMPILER_MSVC
# pragma warning(pop)
#endif
};
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_awaiter
{
public:
multi_producer_sequencer_claim_awaiter(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
const sequence_range<SEQUENCE, TRAITS>& claimedRange,
SCHEDULER& scheduler) noexcept
: m_barrierWait(consumerBarrier, claimedRange.back() - bufferSize, scheduler)
, m_claimedRange(claimedRange)
{}
bool await_ready() const noexcept
{
return m_barrierWait.await_ready();
}
auto await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
return m_barrierWait.await_suspend(awaitingCoroutine);
}
sequence_range<SEQUENCE, TRAITS> await_resume() noexcept
{
return m_claimedRange;
}
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_barrierWait;
sequence_range<SEQUENCE, TRAITS> m_claimedRange;
};
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_operation
{
public:
multi_producer_sequencer_claim_operation(
multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
std::size_t count,
SCHEDULER& scheduler) noexcept
: m_sequencer(sequencer)
, m_count(count < sequencer.buffer_size() ? count : sequencer.buffer_size())
, m_scheduler(scheduler)
{
}
multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
{
// We wait until the awaitable is actually co_await'ed before we claim the
// range of elements. If we claimed them earlier, then it may be possible for
// the caller to fail to co_await the result eg. due to an exception, which
// would leave the sequence numbers unable to be published and would eventually
// deadlock consumers that waited on them.
//
// TODO: We could try and acquire only as many as are available if fewer than
// m_count elements are available. This would complicate the logic here somewhat
// as we'd need to use a compare-exchange instead.
const SEQUENCE first = m_sequencer.m_nextToClaim.fetch_add(m_count, std::memory_order_relaxed);
return multi_producer_sequencer_claim_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
m_sequencer.m_consumerBarrier,
m_sequencer.buffer_size(),
sequence_range<SEQUENCE, TRAITS>{ first, first + m_count },
m_scheduler
};
}
private:
multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
std::size_t m_count;
SCHEDULER& m_scheduler;
};
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_one_awaiter
{
public:
multi_producer_sequencer_claim_one_awaiter(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
SEQUENCE claimedSequence,
SCHEDULER& scheduler) noexcept
: m_waitOp(consumerBarrier, claimedSequence - bufferSize, scheduler)
, m_claimedSequence(claimedSequence)
{}
bool await_ready() const noexcept
{
return m_waitOp.await_ready();
}
auto await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
return m_waitOp.await_suspend(awaitingCoroutine);
}
SEQUENCE await_resume() noexcept
{
return m_claimedSequence;
}
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS, SCHEDULER> m_waitOp;
SEQUENCE m_claimedSequence;
};
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_claim_one_operation
{
public:
multi_producer_sequencer_claim_one_operation(
multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SCHEDULER& scheduler) noexcept
: m_sequencer(sequencer)
, m_scheduler(scheduler)
{}
multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER> operator co_await() noexcept
{
return multi_producer_sequencer_claim_one_awaiter<SEQUENCE, TRAITS, SCHEDULER>{
m_sequencer.m_consumerBarrier,
m_sequencer.buffer_size(),
m_sequencer.m_nextToClaim.fetch_add(1, std::memory_order_relaxed),
m_scheduler
};
}
private:
multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
SCHEDULER& m_scheduler;
};
template<typename SEQUENCE, typename TRAITS>
class multi_producer_sequencer_wait_operation_base
{
public:
multi_producer_sequencer_wait_operation_base(
const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished) noexcept
: m_sequencer(sequencer)
, m_targetSequence(targetSequence)
, m_lastKnownPublished(lastKnownPublished)
, m_readyToResume(false)
{}
multi_producer_sequencer_wait_operation_base(
const multi_producer_sequencer_wait_operation_base& other) noexcept
: m_sequencer(other.m_sequencer)
, m_targetSequence(other.m_targetSequence)
, m_lastKnownPublished(other.m_lastKnownPublished)
, m_readyToResume(false)
{}
bool await_ready() const noexcept
{
return !TRAITS::precedes(m_lastKnownPublished, m_targetSequence);
}
bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
m_awaitingCoroutine = awaitingCoroutine;
m_sequencer.add_awaiter(this);
// Mark the waiter as ready to resume.
// If it was already marked as ready-to-resume within the call to add_awaiter() or
// on another thread then this exchange() will return true. In this case we want to
// resume immediately and continue execution by returning false.
return !m_readyToResume.exchange(true, std::memory_order_acquire);
}
SEQUENCE await_resume() noexcept
{
return m_lastKnownPublished;
}
protected:
friend class multi_producer_sequencer<SEQUENCE, TRAITS>;
void resume(SEQUENCE lastKnownPublished) noexcept
{
m_lastKnownPublished = lastKnownPublished;
if (m_readyToResume.exchange(true, std::memory_order_release))
{
resume_impl();
}
}
virtual void resume_impl() noexcept = 0;
const multi_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
SEQUENCE m_targetSequence;
SEQUENCE m_lastKnownPublished;
multi_producer_sequencer_wait_operation_base* m_next;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
std::atomic<bool> m_readyToResume;
};
template<typename SEQUENCE, typename TRAITS, typename SCHEDULER>
class multi_producer_sequencer_wait_operation :
public multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>
{
using schedule_operation = decltype(std::declval<SCHEDULER&>().schedule());
public:
multi_producer_sequencer_wait_operation(
const multi_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) noexcept
: multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(sequencer, targetSequence, lastKnownPublished)
, m_scheduler(scheduler)
{}
multi_producer_sequencer_wait_operation(
const multi_producer_sequencer_wait_operation& other) noexcept
: multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>(other)
, m_scheduler(other.m_scheduler)
{}
~multi_producer_sequencer_wait_operation()
{
if (m_isScheduleAwaiterCreated)
{
m_scheduleAwaiter.destruct();
}
if (m_isScheduleOperationCreated)
{
m_scheduleOperation.destruct();
}
}
SEQUENCE await_resume() noexcept(noexcept(m_scheduleOperation->await_resume()))
{
if (m_isScheduleOperationCreated)
{
m_scheduleOperation->await_resume();
}
return multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>::await_resume();
}
private:
void resume_impl() noexcept override
{
try
{
m_scheduleOperation.construct(m_scheduler.schedule());
m_isScheduleOperationCreated = true;
m_scheduleAwaiter.construct(detail::get_awaiter(
static_cast<schedule_operation&&>(*m_scheduleOperation)));
m_isScheduleAwaiterCreated = true;
if (!m_scheduleAwaiter->await_ready())
{
using await_suspend_result_t = decltype(m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine));
if constexpr (std::is_void_v<await_suspend_result_t>)
{
m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine);
return;
}
else if constexpr (std::is_same_v<await_suspend_result_t, bool>)
{
if (m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine))
{
return;
}
}
else
{
// Assume it returns a coroutine_handle.
m_scheduleAwaiter->await_suspend(this->m_awaitingCoroutine).resume();
return;
}
}
}
catch (...)
{
// Ignore failure to reschedule and resume inline?
// Should we catch the exception and rethrow from await_resume()?
// Or should we require that 'co_await scheduler.schedule()' is noexcept?
}
// Resume outside the catch-block.
this->m_awaitingCoroutine.resume();
}
SCHEDULER& m_scheduler;
// Can't use std::optional<T> here since T could be a reference.
detail::manual_lifetime<schedule_operation> m_scheduleOperation;
detail::manual_lifetime<typename awaitable_traits<schedule_operation>::awaiter_t> m_scheduleAwaiter;
bool m_isScheduleOperationCreated = false;
bool m_isScheduleAwaiterCreated = false;
};
template<typename SEQUENCE, typename TRAITS>
multi_producer_sequencer<SEQUENCE, TRAITS>::multi_producer_sequencer(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
SEQUENCE initialSequence)
: m_consumerBarrier(consumerBarrier)
, m_sequenceMask(bufferSize - 1)
, m_published(std::make_unique<std::atomic<SEQUENCE>[]>(bufferSize))
, m_nextToClaim(initialSequence + 1)
, m_awaiters(nullptr)
{
// bufferSize must be a positive power-of-two
assert(bufferSize > 0 && (bufferSize & (bufferSize - 1)) == 0);
// but must be no larger than the max diff value.
using diff_t = typename TRAITS::difference_type;
using unsigned_diff_t = std::make_unsigned_t<diff_t>;
constexpr unsigned_diff_t maxSize = static_cast<unsigned_diff_t>(std::numeric_limits<diff_t>::max());
assert(bufferSize <= maxSize);
SEQUENCE seq = initialSequence - (bufferSize - 1);
do
{
#ifdef __cpp_lib_atomic_value_initialization
m_published[seq & m_sequenceMask].store(seq, std::memory_order_relaxed);
#else // ^^^ __cpp_lib_atomic_value_initialization // !__cpp_lib_atomic_value_initialization vvv
std::atomic_init(&m_published[seq & m_sequenceMask], seq);
#endif // !__cpp_lib_atomic_value_initialization
} while (seq++ != initialSequence);
}
template<typename SEQUENCE, typename TRAITS>
SEQUENCE multi_producer_sequencer<SEQUENCE, TRAITS>::last_published_after(
SEQUENCE lastKnownPublished) const noexcept
{
const auto mask = m_sequenceMask;
SEQUENCE seq = lastKnownPublished + 1;
while (m_published[seq & mask].load(std::memory_order_acquire) == seq)
{
lastKnownPublished = seq++;
}
return lastKnownPublished;
}
template<typename SEQUENCE, typename TRAITS>
template<typename SCHEDULER>
multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>
multi_producer_sequencer<SEQUENCE, TRAITS>::wait_until_published(
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept
{
return multi_producer_sequencer_wait_operation<SEQUENCE, TRAITS, SCHEDULER>{
*this, targetSequence, lastKnownPublished, scheduler
};
}
template<typename SEQUENCE, typename TRAITS>
bool multi_producer_sequencer<SEQUENCE, TRAITS>::any_available() const noexcept
{
return TRAITS::precedes(
m_nextToClaim.load(std::memory_order_relaxed),
m_consumerBarrier.last_published() + buffer_size());
}
template<typename SEQUENCE, typename TRAITS>
template<typename SCHEDULER>
multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>
multi_producer_sequencer<SEQUENCE, TRAITS>::claim_one(SCHEDULER& scheduler) noexcept
{
return multi_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, scheduler };
}
template<typename SEQUENCE, typename TRAITS>
template<typename SCHEDULER>
multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>
multi_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count, SCHEDULER& scheduler) noexcept
{
return multi_producer_sequencer_claim_operation<SEQUENCE, TRAITS, SCHEDULER>{ *this, count, scheduler };
}
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(SEQUENCE sequence) noexcept
{
m_published[sequence & m_sequenceMask].store(sequence, std::memory_order_seq_cst);
// Resume any waiters that might have been satisfied by this publish operation.
resume_ready_awaiters();
}
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::publish(const sequence_range<SEQUENCE, TRAITS>& range) noexcept
{
if (range.empty())
{
return;
}
// Publish all but the first sequence number using relaxed atomics.
// No consumer should be reading those subsequent sequence numbers until they've seen
// that the first sequence number in the range is published.
for (SEQUENCE seq : range.skip(1))
{
m_published[seq & m_sequenceMask].store(seq, std::memory_order_relaxed);
}
// Now publish the first sequence number with seq_cst semantics.
m_published[range.front() & m_sequenceMask].store(range.front(), std::memory_order_seq_cst);
// Resume any waiters that might have been satisfied by this publish operation.
resume_ready_awaiters();
}
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::resume_ready_awaiters() noexcept
{
using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
awaiter_t* awaiters = m_awaiters.load(std::memory_order_seq_cst);
if (awaiters == nullptr)
{
// No awaiters
return;
}
// There were some awaiters. Try to acquire the list of waiters with an
// atomic exchange as we might be racing with other consumers/producers.
awaiters = m_awaiters.exchange(nullptr, std::memory_order_seq_cst);
if (awaiters == nullptr)
{
// Didn't acquire the list
// Some other thread is now responsible for resuming them. Our job is done.
return;
}
SEQUENCE lastKnownPublished;
awaiter_t* awaitersToResume;
awaiter_t** awaitersToResumeTail = &awaitersToResume;
awaiter_t* awaitersToRequeue;
awaiter_t** awaitersToRequeueTail = &awaitersToRequeue;
do
{
using diff_t = typename TRAITS::difference_type;
lastKnownPublished = last_published_after(awaiters->m_lastKnownPublished);
// First scan the list of awaiters and split them into 'requeue' and 'resume' lists.
auto minDiff = std::numeric_limits<diff_t>::max();
do
{
auto diff = TRAITS::difference(awaiters->m_targetSequence, lastKnownPublished);
if (diff > 0)
{
// Not ready yet.
minDiff = diff < minDiff ? diff : minDiff;
*awaitersToRequeueTail = awaiters;
awaitersToRequeueTail = &awaiters->m_next;
}
else
{
*awaitersToResumeTail = awaiters;
awaitersToResumeTail = &awaiters->m_next;
}
awaiters->m_lastKnownPublished = lastKnownPublished;
awaiters = awaiters->m_next;
} while (awaiters != nullptr);
// Null-terinate the requeue list
*awaitersToRequeueTail = nullptr;
if (awaitersToRequeue != nullptr)
{
// Requeue the waiters that are not ready yet.
awaiter_t* oldHead = nullptr;
while (!m_awaiters.compare_exchange_weak(oldHead, awaitersToRequeue, std::memory_order_seq_cst, std::memory_order_relaxed))
{
*awaitersToRequeueTail = oldHead;
}
// Reset the awaitersToRequeue list
awaitersToRequeueTail = &awaitersToRequeue;
const SEQUENCE earliestTargetSequence = lastKnownPublished + minDiff;
// Now we need to check again to see if any of the waiters we just enqueued
// is now satisfied by a concurrent call to publish().
//
// We need to be a bit more careful here since we are no longer holding any
// awaiters and so producers/consumers may advance the sequence number arbitrarily
// far. If the sequence number advances more than buffer_size() ahead of the
// earliestTargetSequence then the m_published[] array may have sequence numbers
// that have advanced beyond earliestTargetSequence, potentially even wrapping
// sequence numbers around to then be preceding where they were before. If this
// happens then we don't need to worry about resuming any awaiters that were waiting
// for 'earliestTargetSequence' since some other thread has already resumed them.
// So the only case we need to worry about here is when all m_published entries for
// sequence numbers in range [lastKnownPublished + 1, earliestTargetSequence] have
// published sequence numbers that match the range.
const auto sequenceMask = m_sequenceMask;
SEQUENCE seq = lastKnownPublished + 1;
while (m_published[seq & sequenceMask].load(std::memory_order_seq_cst) == seq)
{
lastKnownPublished = seq;
if (seq == earliestTargetSequence)
{
// At least one of the awaiters we just published is now satisfied.
// Reacquire the list of awaiters and continue around the outer loop.
awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
break;
}
++seq;
}
}
} while (awaiters != nullptr);
// Null-terminate list of awaiters to resume.
*awaitersToResumeTail = nullptr;
while (awaitersToResume != nullptr)
{
awaiter_t* next = awaitersToResume->m_next;
awaitersToResume->resume(lastKnownPublished);
awaitersToResume = next;
}
}
template<typename SEQUENCE, typename TRAITS>
void multi_producer_sequencer<SEQUENCE, TRAITS>::add_awaiter(
multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>* awaiter) const noexcept
{
using awaiter_t = multi_producer_sequencer_wait_operation_base<SEQUENCE, TRAITS>;
SEQUENCE targetSequence = awaiter->m_targetSequence;
SEQUENCE lastKnownPublished = awaiter->m_lastKnownPublished;
awaiter_t* awaitersToEnqueue = awaiter;
awaiter_t** awaitersToEnqueueTail = &awaiter->m_next;
awaiter_t* awaitersToResume;
awaiter_t** awaitersToResumeTail = &awaitersToResume;
const SEQUENCE sequenceMask = m_sequenceMask;
do
{
// Enqueue the awaiters.
{
awaiter_t* oldHead = m_awaiters.load(std::memory_order_relaxed);
do
{
*awaitersToEnqueueTail = oldHead;
} while (!m_awaiters.compare_exchange_weak(
oldHead,
awaitersToEnqueue,
std::memory_order_seq_cst,
std::memory_order_relaxed));
}
// Reset list of waiters
awaitersToEnqueueTail = &awaitersToEnqueue;
// Check to see if the last-known published sequence number has advanced
// while we were enqueuing the awaiters. Need to use seq_cst memory order
// here to ensure that if there are concurrent calls to publish() that would
// wake up any of the awaiters we just enqueued that either we will see their
// write to m_published slots or they will see our write to m_awaiters.
//
// Note also, that we are assuming that the last-known published sequence is
// not going to advance more than buffer_size() ahead of targetSequence since
// there is at least one consumer that won't be resumed and so thus can't
// publish the sequence number it's waiting for to its sequence_barrier and so
// producers won't be able to claim its slot in the buffer.
//
// TODO: Check whether we can weaken the memory order here to just use 'seq_cst' on the
// first .load() and then use 'acquire' on subsequent .load().
while (m_published[(lastKnownPublished + 1) & sequenceMask].load(std::memory_order_seq_cst) == (lastKnownPublished + 1))
{
++lastKnownPublished;
}
if (!TRAITS::precedes(lastKnownPublished, targetSequence))
{
// At least one awaiter we just enqueued has now been satisified.
// To ensure it is woken up we need to reacquire the list of awaiters and resume
awaiter_t* awaiters = m_awaiters.exchange(nullptr, std::memory_order_acquire);
using diff_t = typename TRAITS::difference_type;
diff_t minDiff = std::numeric_limits<diff_t>::max();
while (awaiters != nullptr)
{
diff_t diff = TRAITS::difference(targetSequence, lastKnownPublished);
if (diff > 0)
{
// Not yet ready.
minDiff = diff < minDiff ? diff : minDiff;
*awaitersToEnqueueTail = awaiters;
awaitersToEnqueueTail = &awaiters->m_next;
awaiters->m_lastKnownPublished = lastKnownPublished;
}
else
{
// Now ready.
*awaitersToResumeTail = awaiters;
awaitersToResumeTail = &awaiters->m_next;
}
awaiters = awaiters->m_next;
}
// Calculate the earliest sequence number that any awaiters in the
// awaitersToEnqueue list are waiting for. We'll use this next time
// around the loop.
targetSequence = static_cast<SEQUENCE>(lastKnownPublished + minDiff);
}
// Null-terminate list of awaiters to enqueue.
*awaitersToEnqueueTail = nullptr;
} while (awaitersToEnqueue != nullptr);
// Null-terminate awaiters to resume.
*awaitersToResumeTail = nullptr;
// Finally, resume any awaiters we've found that are ready to go.
while (awaitersToResume != nullptr)
{
// Read m_next before calling .resume() as resuming could destroy the awaiter.
awaiter_t* next = awaitersToResume->m_next;
awaitersToResume->resume(lastKnownPublished);
awaitersToResume = next;
}
}
}
#endif