Add cppcoro::single_producer_sequencer class.

- Add sequence_range in support of this.
- Add sequence_traits::size_type.
This commit is contained in:
Lewis Baker 2017-11-28 22:59:21 +10:30
parent c687f0d4bb
commit 2cd5fce40c
6 changed files with 427 additions and 0 deletions

View file

@ -0,0 +1,102 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_SEQUENCE_RANGE_HPP_INCLUDED
#define CPPCORO_SEQUENCE_RANGE_HPP_INCLUDED
#include <cppcoro/sequence_traits.hpp>
#include <algorithm>
#include <iterator>
namespace cppcoro
{
template<typename SEQUENCE, typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_range
{
public:
using value_type = SEQUENCE;
using difference_type = typename TRAITS::difference_type;
using size_type = typename TRAITS::size_type;
class const_iterator
{
public:
using iterator_category = std::random_access_iterator_tag;
using value_type = SEQUENCE;
using difference_type = typename TRAITS::difference_type;
using reference = const SEQUENCE&;
using pointer = const SEQUENCE*;
explicit constexpr const_iterator(SEQUENCE value) noexcept : m_value(value) {}
const SEQUENCE& operator*() const noexcept { return m_value; }
const SEQUENCE* operator->() const noexcept { return std::addressof(m_value); }
const_iterator& operator++() noexcept { ++m_value; return *this; }
const_iterator& operator--() noexcept { --m_value; return *this; }
const_iterator operator++(int) noexcept { return const_iterator(m_value++); }
const_iterator operator--(int) noexcept { return const_iterator(m_value--); }
constexpr difference_type operator-(const_iterator other) const noexcept { return TRAITS::difference(m_value, other.m_value); }
constexpr const_iterator operator-(difference_type delta) const noexcept { return const_iterator{ static_cast<SEQUENCE>(m_value - delta) }; }
constexpr const_iterator operator+(difference_type delta) const noexcept { return const_iterator{ static_cast<SEQUENCE>(m_value + delta) }; }
constexpr bool operator==(const_iterator other) const noexcept { return m_value == other.m_value; }
constexpr bool operator!=(const_iterator other) const noexcept { return m_value != other.m_value; }
private:
SEQUENCE m_value;
};
constexpr sequence_range() noexcept
: m_begin()
, m_end()
{}
constexpr sequence_range(SEQUENCE begin, SEQUENCE end) noexcept
: m_begin(begin)
, m_end(end)
{}
constexpr const_iterator begin() const noexcept { return const_iterator(m_begin); }
constexpr const_iterator end() const noexcept { return const_iterator(m_end); }
constexpr SEQUENCE front() const noexcept { return m_begin; }
constexpr SEQUENCE back() const noexcept { return m_end - 1; }
constexpr size_type size() const noexcept
{
return static_cast<size_type>(TRAITS::difference(m_end, m_begin));
}
constexpr bool empty() const noexcept
{
return m_begin == m_end;
}
constexpr SEQUENCE operator[](size_type index) const noexcept
{
return m_begin + index;
}
constexpr sequence_range first(size_type count) const noexcept
{
return sequence_range{ m_begin, static_cast<SEQUENCE>(m_begin + std::min(size(), count)) };
}
private:
SEQUENCE m_begin;
SEQUENCE m_end;
};
}
#endif

View file

@ -14,6 +14,7 @@ namespace cppcoro
{
using value_type = SEQUENCE;
using difference_type = std::make_signed_t<SEQUENCE>;
using size_type = std::make_unsigned_t<SEQUENCE>;
static constexpr value_type initial_sequence = static_cast<value_type>(-1);

View file

@ -0,0 +1,203 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_SINGLE_PRODUCER_SEQUENCER_HPP_INCLUDED
#define CPPCORO_SINGLE_PRODUCER_SEQUENCER_HPP_INCLUDED
#include <cppcoro/sequence_barrier.hpp>
#include <cppcoro/sequence_range.hpp>
namespace cppcoro
{
template<typename SEQUENCE, typename TRAITS>
class single_producer_sequencer_claim_one_operation;
template<typename SEQUENCE, typename TRAITS>
class single_producer_sequencer_claim_operation;
template<
typename SEQUENCE = std::size_t,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer(
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std::size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept
: m_consumerBarrier(consumerBarrier)
, m_bufferSize(bufferSize)
, m_nextToClaim(initialSequence + 1)
, m_producerBarrier(initialSequence)
{}
/// Claim a slot in the ring buffer asynchronously.
///
/// \return
/// Returns an operation that when awaited will suspend the coroutine until
/// a slot is available for writing in the ring buffer. The result of the
/// co_await expression will be the sequence number of the slot.
/// The caller must publish() the claimed sequence number once they have written to
/// the ring-buffer.
[[nodiscard]]
single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS> claim_one() noexcept;
/// Claim one or more contiguous slots in the ring-buffer.
///
/// Use this method over many calls to claim_one() when you have multiple elements to
/// enqueue. This will claim as many slots as are available up to the specified count
/// but may claim as few as one slot if only one slot is available.
///
/// \param count
/// The maximum number of slots to claim.
///
/// \return
/// Returns an awaitable object that when awaited returns a sequence_range that contains
/// the range of sequence numbers that were claimed. Once you have written element values
/// to all of the claimed slots you must publish() the sequence range in order to make
/// the elements available to consumers.
[[nodiscard]]
single_producer_sequencer_claim_operation<SEQUENCE, TRAITS> claim_up_to(std::size_t count) noexcept;
void publish(SEQUENCE sequence) noexcept
{
m_producerBarrier.publish(sequence);
}
void publish(const sequence_range<SEQUENCE, TRAITS>& sequences) noexcept
{
m_producerBarrier.publish(sequences.back());
}
SEQUENCE last_published() const noexcept
{
return m_producerBarrier.last_published();
}
[[nodiscard]]
auto wait_until_published(SEQUENCE targetSequence) const noexcept
{
return m_producerBarrier.wait_until_published(targetSequence);
}
private:
friend class single_producer_sequencer_claim_operation<SEQUENCE, TRAITS>;
friend class single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>;
#if CPPCORO_COMPILER_MSVC
# pragma warning(push)
# pragma warning(disable : 4324) // C4324: structure was padded due to alignment specifier
#endif
const sequence_barrier<SEQUENCE, TRAITS>& m_consumerBarrier;
const std::size_t m_bufferSize;
alignas(std::hardware_destructive_interference_size)
SEQUENCE m_nextToClaim;
sequence_barrier<SEQUENCE, TRAITS> m_producerBarrier;
#if CPPCORO_COMPILER_MSVC
# pragma warning(pop)
#endif
};
template<typename SEQUENCE, typename TRAITS>
class single_producer_sequencer_claim_one_operation
{
public:
single_producer_sequencer_claim_one_operation(
single_producer_sequencer<SEQUENCE, TRAITS>& sequencer) noexcept
: m_consumerWaitOperation(sequencer.m_consumerBarrier, static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize))
, m_sequencer(sequencer)
{}
bool await_ready() const noexcept
{
return m_consumerWaitOperation.await_ready();
}
auto await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
return m_consumerWaitOperation.await_suspend(awaitingCoroutine);
}
SEQUENCE await_resume() const noexcept
{
return m_sequencer.m_nextToClaim++;
}
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS> m_consumerWaitOperation;
single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
};
template<typename SEQUENCE, typename TRAITS>
class single_producer_sequencer_claim_operation
{
public:
single_producer_sequencer_claim_operation(
single_producer_sequencer<SEQUENCE, TRAITS>& sequencer,
std::size_t count) noexcept
: m_consumerWaitOperation(sequencer.m_consumerBarrier, static_cast<SEQUENCE>(sequencer.m_nextToClaim - sequencer.m_bufferSize))
, m_sequencer(sequencer)
, m_count(count)
{}
bool await_ready() const noexcept
{
return m_consumerWaitOperation.await_ready();
}
auto await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
return m_consumerWaitOperation.await_suspend(awaitingCoroutine);
}
sequence_range<SEQUENCE, TRAITS> await_resume() noexcept
{
const SEQUENCE lastAvailableSequence =
static_cast<SEQUENCE>(m_consumerWaitOperation.await_resume() + m_sequencer.m_bufferSize);
const SEQUENCE begin = m_sequencer.m_nextToClaim;
const std::size_t availableCount = static_cast<std::size_t>(lastAvailableSequence - begin) + 1;
const std::size_t countToClaim = std::min(m_count, availableCount);
const SEQUENCE end = static_cast<SEQUENCE>(begin + countToClaim);
m_sequencer.m_nextToClaim = end;
return sequence_range<SEQUENCE, TRAITS>(begin, end);
}
private:
sequence_barrier_wait_operation<SEQUENCE, TRAITS> m_consumerWaitOperation;
single_producer_sequencer<SEQUENCE, TRAITS>& m_sequencer;
std::size_t m_count;
};
template<typename SEQUENCE, typename TRAITS>
[[nodiscard]]
single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>
single_producer_sequencer<SEQUENCE, TRAITS>::claim_one() noexcept
{
return single_producer_sequencer_claim_one_operation<SEQUENCE, TRAITS>{ *this };
}
template<typename SEQUENCE, typename TRAITS>
[[nodiscard]]
single_producer_sequencer_claim_operation<SEQUENCE, TRAITS>
single_producer_sequencer<SEQUENCE, TRAITS>::claim_up_to(std::size_t count) noexcept
{
return single_producer_sequencer_claim_operation<SEQUENCE, TRAITS>(*this, count);
}
}
#endif

View file

@ -22,6 +22,7 @@ includes = cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', [
'task.hpp',
'sequence_barrier.hpp',
'sequence_traits.hpp',
'single_producer_sequencer.hpp',
'shared_task.hpp',
'shared_task.hpp',
'single_consumer_event.hpp',

View file

@ -32,6 +32,7 @@ sources = script.cwd([
'shared_task_tests.cpp',
'sync_wait_tests.cpp',
'single_consumer_async_auto_reset_event_tests.cpp',
'single_producer_sequencer_tests.cpp',
'when_all_tests.cpp',
'when_all_ready_tests.cpp',
])

View file

@ -0,0 +1,119 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#include <cppcoro/single_producer_sequencer.hpp>
#include <cppcoro/config.hpp>
#include <cppcoro/sequence_barrier.hpp>
#include <cppcoro/sequence_traits.hpp>
#include <cppcoro/on_scope_exit.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/when_all.hpp>
#include <cppcoro/task.hpp>
#if CPPCORO_OS_WINNT
# include <cppcoro/io_service.hpp>
# include <thread>
#endif
#include <ostream>
#include "doctest/doctest.h"
DOCTEST_TEST_SUITE_BEGIN("single_producer_sequencer");
using namespace cppcoro;
#if CPPCORO_OS_WINNT
DOCTEST_TEST_CASE("multi-threaded usage single consumer")
{
io_service ioSvc;
// Spin up 2 io threads
std::thread ioThread1{ [&] { ioSvc.process_events(); } };
auto joinOnExit1 = on_scope_exit([&] { ioThread1.join(); });
auto stopOnExit1 = on_scope_exit([&] { ioSvc.stop(); });
std::thread ioThread2{ [&] { ioSvc.process_events(); } };
auto joinOnExit2 = on_scope_exit([&] { ioThread2.join(); });
auto stopOnExit2 = std::move(stopOnExit1);
constexpr std::size_t bufferSize = 256;
sequence_barrier<std::size_t> readBarrier;
single_producer_sequencer<std::size_t> sequencer(readBarrier, bufferSize);
constexpr std::size_t iterationCount = 1'000'000;
std::uint64_t buffer[bufferSize];
auto[result, dummy] = sync_wait(when_all(
[&]() -> task<std::uint64_t>
{
// Consumer
co_await ioSvc.schedule();
std::uint64_t sum = 0;
bool reachedEnd = false;
std::size_t nextToRead = 0;
do
{
std::size_t available = sequencer.last_published();
if (sequence_traits<std::size_t>::precedes(available, nextToRead))
{
available = co_await sequencer.wait_until_published(nextToRead);
co_await ioSvc.schedule();
}
do
{
sum += buffer[nextToRead % bufferSize];
} while (nextToRead++ != available);
// Zero value is sentinel that indicates the end of the stream.
reachedEnd = buffer[available % bufferSize] == 0;
// Notify that we've finished processing up to 'available'.
readBarrier.publish(available);
} while (!reachedEnd);
co_return sum;
}(),
[&]() -> task<>
{
// Producer
co_await ioSvc.schedule();
constexpr std::size_t maxBatchSize = 10;
std::size_t i = 0;
while (i < iterationCount)
{
const std::size_t batchSize = std::min(maxBatchSize, iterationCount - i);
auto sequences = co_await sequencer.claim_up_to(batchSize);
for (auto seq : sequences)
{
buffer[seq % bufferSize] = ++i;
}
sequencer.publish(sequences.back());
}
auto finalSeq = co_await sequencer.claim_one();
buffer[finalSeq % bufferSize] = 0;
sequencer.publish(finalSeq);
}()));
// Suppress unused variable warning.
(void)dummy;
constexpr std::uint64_t expectedResult =
std::uint64_t(iterationCount) * std::uint64_t(iterationCount + 1) / 2;
CHECK(result == expectedResult);
}
#endif
DOCTEST_TEST_SUITE_END();