mirror of
https://github.com/oxen-io/lokinet
synced 2023-12-14 06:53:00 +01:00
Create QueueManager component with test suite
This commit is contained in:
parent
d01a5d73b3
commit
951a065867
5 changed files with 1903 additions and 2 deletions
4
.vscode/launch.json
vendored
4
.vscode/launch.json
vendored
|
@ -7,8 +7,8 @@
|
|||
"name": "(lldb) Launch",
|
||||
"type": "cppdbg",
|
||||
"request": "launch",
|
||||
"program": "${workspaceFolder}/lokinet",
|
||||
"args": [],
|
||||
"program": "${workspaceFolder}/build/testAll",
|
||||
"args": ["--gtest_shuffle", "--gtest_filter=-AbyssTest.TestClientAndServer"],
|
||||
"stopAtEntry": false,
|
||||
"cwd": "${workspaceFolder}",
|
||||
"environment": [],
|
||||
|
|
|
@ -266,6 +266,7 @@ set(LIB_PLATFORM_SRC
|
|||
# for logic
|
||||
llarp/timer.cpp
|
||||
# for threading
|
||||
llarp/queue_manager.cpp
|
||||
llarp/threadpool.cpp
|
||||
# for android shim
|
||||
${ANDROID_PLATFORM_SRC}
|
||||
|
@ -512,6 +513,7 @@ set(TEST_SRC
|
|||
test/test_dns_unit.cpp
|
||||
test/test_dnsc_unit.cpp
|
||||
test/test_dnsd_unit.cpp
|
||||
test/test_llarp_queue_manager.cpp
|
||||
)
|
||||
|
||||
|
||||
|
|
562
llarp/queue_manager.cpp
Normal file
562
llarp/queue_manager.cpp
Normal file
|
@ -0,0 +1,562 @@
|
|||
#include "queue_manager.hpp"
|
||||
|
||||
#include <thread>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace thread
|
||||
{
|
||||
// Turn an enum into its underlying value.
|
||||
template < typename E >
|
||||
constexpr auto
|
||||
to_underlying(E e) noexcept
|
||||
{
|
||||
return static_cast< std::underlying_type_t< E > >(e);
|
||||
}
|
||||
|
||||
static constexpr uint32_t GENERATION_COUNT_SHIFT = 0x2;
|
||||
|
||||
// Max number of generations which can be held in an uint32_t.
|
||||
static constexpr size_t NUM_ELEMENT_GENERATIONS = 1
|
||||
<< ((sizeof(uint32_t) * 8) - 2);
|
||||
|
||||
// mask for holding the element state from an element
|
||||
static constexpr uint32_t ELEMENT_STATE_MASK = 0x3;
|
||||
|
||||
// mask for holding the disabled bit in the index.
|
||||
static constexpr uint32_t DISABLED_STATE_MASK = 1
|
||||
<< ((sizeof(uint32_t) * 8) - 1);
|
||||
|
||||
// Max number of combinations of index and generations.
|
||||
static constexpr uint32_t NUM_COMBINED_INDEXES = DISABLED_STATE_MASK;
|
||||
|
||||
bool
|
||||
isDisabledFlagSet(uint32_t encodedIndex)
|
||||
{
|
||||
return (encodedIndex & DISABLED_STATE_MASK);
|
||||
}
|
||||
|
||||
uint32_t
|
||||
discardDisabledFlag(uint32_t encodedIndex)
|
||||
{
|
||||
return (encodedIndex & ~DISABLED_STATE_MASK);
|
||||
}
|
||||
|
||||
uint32_t
|
||||
encodeElement(uint32_t generation, ElementState state)
|
||||
{
|
||||
return (generation << GENERATION_COUNT_SHIFT) | to_underlying(state);
|
||||
}
|
||||
|
||||
uint32_t
|
||||
decodeGenerationFromElementState(uint32_t state)
|
||||
{
|
||||
return state >> GENERATION_COUNT_SHIFT;
|
||||
}
|
||||
|
||||
ElementState
|
||||
decodeStateFromElementState(uint32_t state)
|
||||
{
|
||||
return ElementState(state & ELEMENT_STATE_MASK);
|
||||
}
|
||||
|
||||
QueueManager::AtomicIndex&
|
||||
QueueManager::pushIndex()
|
||||
{
|
||||
return m_pushIndex;
|
||||
}
|
||||
|
||||
QueueManager::AtomicIndex&
|
||||
QueueManager::popIndex()
|
||||
{
|
||||
return m_popIndex;
|
||||
}
|
||||
|
||||
const QueueManager::AtomicIndex&
|
||||
QueueManager::pushIndex() const
|
||||
{
|
||||
return m_pushIndex;
|
||||
}
|
||||
|
||||
const QueueManager::AtomicIndex&
|
||||
QueueManager::popIndex() const
|
||||
{
|
||||
return m_popIndex;
|
||||
}
|
||||
|
||||
uint32_t
|
||||
QueueManager::nextCombinedIndex(uint32_t index) const
|
||||
{
|
||||
if(m_maxCombinedIndex == index)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
return index + 1;
|
||||
}
|
||||
|
||||
uint32_t
|
||||
QueueManager::nextGeneration(uint32_t generation) const
|
||||
{
|
||||
if(m_maxGeneration == generation)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
return generation + 1;
|
||||
}
|
||||
|
||||
size_t
|
||||
QueueManager::capacity() const
|
||||
{
|
||||
return m_capacity;
|
||||
}
|
||||
|
||||
int32_t
|
||||
QueueManager::circularDifference(uint32_t startingValue,
|
||||
uint32_t subtractValue, uint32_t modulo)
|
||||
{
|
||||
assert(modulo
|
||||
<= (static_cast< uint32_t >(std::numeric_limits< int32_t >::max())
|
||||
+ 1));
|
||||
assert(startingValue < modulo);
|
||||
assert(subtractValue < modulo);
|
||||
|
||||
int32_t difference = startingValue - subtractValue;
|
||||
if(difference > static_cast< int32_t >(modulo / 2))
|
||||
{
|
||||
return difference - modulo;
|
||||
}
|
||||
else if(difference < -static_cast< int32_t >(modulo / 2))
|
||||
{
|
||||
return difference + modulo;
|
||||
}
|
||||
else
|
||||
{
|
||||
return difference;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t
|
||||
QueueManager::numGenerations(size_t capacity)
|
||||
{
|
||||
assert(capacity != 0);
|
||||
|
||||
return static_cast< uint32_t >(
|
||||
std::min(NUM_COMBINED_INDEXES / capacity, NUM_ELEMENT_GENERATIONS));
|
||||
}
|
||||
|
||||
QueueManager::QueueManager(size_t capacity)
|
||||
: m_pushIndex(0)
|
||||
, m_popIndex(0)
|
||||
, m_capacity(capacity)
|
||||
, m_maxGeneration(numGenerations(capacity) - 1)
|
||||
, m_maxCombinedIndex(
|
||||
numGenerations(capacity) * static_cast< uint32_t >(capacity) - 1)
|
||||
{
|
||||
assert(0 < capacity);
|
||||
assert(capacity <= MAX_CAPACITY);
|
||||
(void)m_pushPadding;
|
||||
(void)m_popPadding;
|
||||
|
||||
m_states = new std::atomic_uint32_t[capacity];
|
||||
|
||||
for(size_t i = 0; i < capacity; ++i)
|
||||
{
|
||||
m_states[i] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
QueueManager::~QueueManager()
|
||||
{
|
||||
delete m_states;
|
||||
}
|
||||
|
||||
QueueReturn
|
||||
QueueManager::reservePushIndex(uint32_t& generation, uint32_t& index)
|
||||
{
|
||||
uint32_t loadedPushIndex = pushIndex().load(std::memory_order_relaxed);
|
||||
|
||||
uint32_t savedPushIndex = -1;
|
||||
|
||||
uint32_t combinedIndex = 0;
|
||||
uint32_t currIdx = 0;
|
||||
uint32_t currGen = 0;
|
||||
|
||||
// Use savedPushIndex to make us acquire an index at least twice before
|
||||
// returning QueueFull.
|
||||
// This prevents us from massive contention when we have a queue of size 1
|
||||
|
||||
for(;;)
|
||||
{
|
||||
if(isDisabledFlagSet(loadedPushIndex))
|
||||
{
|
||||
return QueueReturn::QueueDisabled;
|
||||
}
|
||||
|
||||
combinedIndex = discardDisabledFlag(loadedPushIndex);
|
||||
|
||||
currGen = static_cast< uint32_t >(combinedIndex / m_capacity);
|
||||
currIdx = static_cast< uint32_t >(combinedIndex % m_capacity);
|
||||
|
||||
uint32_t compare = encodeElement(currGen, ElementState::Empty);
|
||||
const uint32_t swap = encodeElement(currGen, ElementState::Writing);
|
||||
|
||||
if(m_states[currIdx].compare_exchange_strong(compare, swap))
|
||||
{
|
||||
// We changed the state.
|
||||
generation = currGen;
|
||||
index = currIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
// We failed to reserve the index. Use the result from cmp n swap to
|
||||
// determine if the queue was full or not. Either:
|
||||
// 1. The cell is from a previous generation (so the queue is full)
|
||||
// 2. Another cell has reserved this cell for writing, but not commited
|
||||
// yet
|
||||
// 3. The push index has been changed between the load and the cmp.
|
||||
|
||||
uint32_t elemGen = decodeGenerationFromElementState(compare);
|
||||
|
||||
int32_t difference = static_cast< int32_t >(currGen - elemGen);
|
||||
|
||||
if(difference == 1
|
||||
|| (difference == -static_cast< int32_t >(m_maxGeneration)))
|
||||
{
|
||||
// Queue is full.
|
||||
|
||||
assert(1
|
||||
== circularDifference(currGen, elemGen, m_maxGeneration + 1));
|
||||
|
||||
ElementState state = decodeStateFromElementState(compare);
|
||||
|
||||
if(state == ElementState::Reading)
|
||||
{
|
||||
// Another thread is reading. Yield this thread
|
||||
std::this_thread::yield();
|
||||
loadedPushIndex = pushIndex().load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(state != ElementState::Empty);
|
||||
|
||||
if(savedPushIndex != loadedPushIndex)
|
||||
{
|
||||
// Make another attempt to check the queue is full before failing
|
||||
std::this_thread::yield();
|
||||
savedPushIndex = loadedPushIndex;
|
||||
loadedPushIndex = pushIndex().load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
return QueueReturn::QueueFull;
|
||||
}
|
||||
|
||||
// Another thread has already acquired this cell, try to increment the
|
||||
// push index and go again.
|
||||
|
||||
assert(0 >= circularDifference(currGen, elemGen, m_maxGeneration + 1));
|
||||
|
||||
const uint32_t next = nextCombinedIndex(combinedIndex);
|
||||
pushIndex().compare_exchange_strong(combinedIndex, next);
|
||||
loadedPushIndex = combinedIndex;
|
||||
}
|
||||
|
||||
// We got the cell, increment the push index
|
||||
const uint32_t next = nextCombinedIndex(combinedIndex);
|
||||
pushIndex().compare_exchange_strong(combinedIndex, next);
|
||||
|
||||
return QueueReturn::Success;
|
||||
}
|
||||
|
||||
void
|
||||
QueueManager::commitPushIndex(uint32_t generation, uint32_t index)
|
||||
{
|
||||
assert(generation <= m_maxGeneration);
|
||||
assert(index < m_capacity);
|
||||
assert(ElementState::Writing
|
||||
== decodeStateFromElementState(m_states[index]));
|
||||
assert(generation == decodeGenerationFromElementState(m_states[index]));
|
||||
|
||||
m_states[index] = encodeElement(generation, ElementState::Full);
|
||||
}
|
||||
|
||||
QueueReturn
|
||||
QueueManager::reservePopIndex(uint32_t& generation, uint32_t& index)
|
||||
{
|
||||
uint32_t loadedPopIndex = popIndex().load();
|
||||
uint32_t savedPopIndex = -1;
|
||||
|
||||
uint32_t currIdx = 0;
|
||||
uint32_t currGen = 0;
|
||||
|
||||
for(;;)
|
||||
{
|
||||
currGen = static_cast< uint32_t >(loadedPopIndex / m_capacity);
|
||||
currIdx = static_cast< uint32_t >(loadedPopIndex % m_capacity);
|
||||
|
||||
// Try to swap this state from full to reading.
|
||||
|
||||
uint32_t compare = encodeElement(currGen, ElementState::Full);
|
||||
const uint32_t swap = encodeElement(currGen, ElementState::Reading);
|
||||
|
||||
if(m_states[currIdx].compare_exchange_strong(compare, swap))
|
||||
{
|
||||
generation = currGen;
|
||||
index = currIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
// We failed to reserve the index. Use the result from cmp n swap to
|
||||
// determine if the queue was full or not. Either:
|
||||
// 1. The cell is from a previous generation (so the queue is empty)
|
||||
// 2. The cell is from the current generation and empty (so the queue is
|
||||
// empty)
|
||||
// 3. The queue is being written to
|
||||
// 4. The pop index has been changed between the load and the cmp.
|
||||
|
||||
uint32_t elemGen = decodeGenerationFromElementState(compare);
|
||||
ElementState state = decodeStateFromElementState(compare);
|
||||
|
||||
int32_t difference = static_cast< int32_t >(currGen - elemGen);
|
||||
|
||||
if(difference == 1
|
||||
|| (difference == -static_cast< int32_t >(m_maxGeneration)))
|
||||
{
|
||||
// Queue is full.
|
||||
assert(state == ElementState::Reading);
|
||||
assert(
|
||||
1 == (circularDifference(currGen, elemGen, m_maxGeneration) + 1));
|
||||
|
||||
return QueueReturn::QueueEmpty;
|
||||
}
|
||||
|
||||
if(difference == 0 && state == ElementState::Empty)
|
||||
{
|
||||
// The cell is empty in the current generation, so the queue is empty
|
||||
|
||||
if(savedPopIndex != loadedPopIndex)
|
||||
{
|
||||
std::this_thread::yield();
|
||||
savedPopIndex = loadedPopIndex;
|
||||
loadedPopIndex = popIndex().load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
return QueueReturn::QueueEmpty;
|
||||
}
|
||||
|
||||
if(difference != 0 || state == ElementState::Writing)
|
||||
{
|
||||
// The cell is currently being written to or the index is outdated)
|
||||
// Yield and try again.
|
||||
std::this_thread::yield();
|
||||
loadedPopIndex = popIndex().load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
popIndex().compare_exchange_strong(loadedPopIndex,
|
||||
nextCombinedIndex(loadedPopIndex));
|
||||
}
|
||||
|
||||
popIndex().compare_exchange_strong(loadedPopIndex,
|
||||
nextCombinedIndex(loadedPopIndex));
|
||||
|
||||
return QueueReturn::Success;
|
||||
}
|
||||
|
||||
void
|
||||
QueueManager::commitPopIndex(uint32_t generation, uint32_t index)
|
||||
{
|
||||
assert(generation <= m_maxGeneration);
|
||||
assert(index < m_capacity);
|
||||
assert(decodeStateFromElementState(m_states[index])
|
||||
== ElementState::Reading);
|
||||
assert(generation == decodeGenerationFromElementState(m_states[index]));
|
||||
|
||||
m_states[index] =
|
||||
encodeElement(nextGeneration(generation), ElementState::Empty);
|
||||
}
|
||||
|
||||
void
|
||||
QueueManager::disable()
|
||||
{
|
||||
// Loop until we set the disabled bit
|
||||
for(;;)
|
||||
{
|
||||
uint32_t index = pushIndex();
|
||||
|
||||
if(isDisabledFlagSet(index))
|
||||
{
|
||||
// Queue is already disabled(?!)
|
||||
return;
|
||||
}
|
||||
|
||||
if(pushIndex().compare_exchange_strong(index,
|
||||
index | DISABLED_STATE_MASK))
|
||||
{
|
||||
// queue has been disabled
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
QueueManager::enable()
|
||||
{
|
||||
for(;;)
|
||||
{
|
||||
uint32_t index = pushIndex();
|
||||
|
||||
if(!isDisabledFlagSet(index))
|
||||
{
|
||||
// queue is already enabled.
|
||||
return;
|
||||
}
|
||||
|
||||
if(pushIndex().compare_exchange_strong(index,
|
||||
index & ~DISABLED_STATE_MASK))
|
||||
{
|
||||
// queue has been enabled
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
QueueManager::reservePopForClear(uint32_t& generation, uint32_t& index,
|
||||
uint32_t endGeneration, uint32_t endIndex)
|
||||
{
|
||||
assert(endGeneration <= m_maxGeneration);
|
||||
assert(endIndex < m_capacity);
|
||||
|
||||
uint32_t loadedCombinedIndex = popIndex().load(std::memory_order_relaxed);
|
||||
|
||||
for(;;)
|
||||
{
|
||||
u_int32_t endCombinedIndex =
|
||||
(endGeneration * static_cast< uint32_t >(m_capacity)) + endIndex;
|
||||
|
||||
if(circularDifference(endCombinedIndex, loadedCombinedIndex,
|
||||
m_maxCombinedIndex + 1)
|
||||
== 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
assert(0 < circularDifference(endCombinedIndex, loadedCombinedIndex,
|
||||
m_maxCombinedIndex + 1));
|
||||
|
||||
u_int32_t currIdx =
|
||||
static_cast< uint32_t >(loadedCombinedIndex % m_capacity);
|
||||
u_int32_t currGen =
|
||||
static_cast< uint32_t >(loadedCombinedIndex / m_capacity);
|
||||
|
||||
// Try to swap this cell from Full to Reading.
|
||||
// We only set this to Empty after trying to increment popIndex, so we
|
||||
// don't race against another thread.
|
||||
|
||||
uint32_t compare = encodeElement(currGen, ElementState::Full);
|
||||
const uint32_t swap = encodeElement(currGen, ElementState::Reading);
|
||||
|
||||
if(m_states[currIdx].compare_exchange_strong(compare, swap))
|
||||
{
|
||||
// We've dropped this index.
|
||||
|
||||
generation = currGen;
|
||||
index = currIdx;
|
||||
break;
|
||||
}
|
||||
|
||||
ElementState state = decodeStateFromElementState(compare);
|
||||
|
||||
if(state == ElementState::Writing || state == ElementState::Full)
|
||||
{
|
||||
// Another thread is writing to this cell, or this thread has slept
|
||||
// for too long.
|
||||
std::this_thread::yield();
|
||||
loadedCombinedIndex = popIndex().load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
const uint32_t next = nextCombinedIndex(loadedCombinedIndex);
|
||||
popIndex().compare_exchange_strong(loadedCombinedIndex, next);
|
||||
}
|
||||
|
||||
// Attempt to increment the index.
|
||||
const uint32_t next = nextCombinedIndex(loadedCombinedIndex);
|
||||
popIndex().compare_exchange_strong(loadedCombinedIndex, next);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
QueueManager::abortPushIndexReservation(uint32_t generation, uint32_t index)
|
||||
{
|
||||
assert(generation <= m_maxGeneration);
|
||||
assert(index < m_capacity);
|
||||
assert(static_cast< uint32_t >((generation * m_capacity) + index)
|
||||
== popIndex().load(std::memory_order_relaxed));
|
||||
assert(decodeStateFromElementState(m_states[index])
|
||||
== ElementState::Writing);
|
||||
assert(generation == decodeGenerationFromElementState(m_states[index]));
|
||||
|
||||
uint32_t loadedPopIndex = popIndex().load(std::memory_order_relaxed);
|
||||
|
||||
assert(generation == loadedPopIndex / m_capacity);
|
||||
assert(index == loadedPopIndex % m_capacity);
|
||||
|
||||
m_states[index] = encodeElement(generation, ElementState::Reading);
|
||||
|
||||
const uint32_t nextIndex = nextCombinedIndex(loadedPopIndex);
|
||||
popIndex().compare_exchange_strong(loadedPopIndex, nextIndex);
|
||||
|
||||
m_states[index] =
|
||||
encodeElement(nextGeneration(generation), ElementState::Empty);
|
||||
}
|
||||
|
||||
size_t
|
||||
QueueManager::size() const
|
||||
{
|
||||
// Note that we rely on these loads being sequentially consistent.
|
||||
|
||||
uint32_t combinedPushIndex = discardDisabledFlag(pushIndex());
|
||||
uint32_t combinedPopIndex = popIndex();
|
||||
|
||||
int32_t difference = combinedPushIndex - combinedPopIndex;
|
||||
|
||||
if(difference >= 0)
|
||||
{
|
||||
if(difference > static_cast< int32_t >(m_capacity))
|
||||
{
|
||||
// We've raced between getting push and pop indexes, in this case, it
|
||||
// means the queue is empty.
|
||||
assert(0 > circularDifference(combinedPushIndex, combinedPopIndex,
|
||||
m_maxCombinedIndex + 1));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
return static_cast< size_t >(difference);
|
||||
}
|
||||
|
||||
if(difference < -static_cast< int32_t >(m_maxCombinedIndex / 2))
|
||||
{
|
||||
assert(0 < circularDifference(combinedPushIndex, combinedPopIndex,
|
||||
m_maxCombinedIndex + 1));
|
||||
|
||||
difference += m_maxCombinedIndex + 1;
|
||||
return std::min(static_cast< size_t >(difference), m_capacity);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool
|
||||
QueueManager::enabled() const
|
||||
{
|
||||
return !isDisabledFlagSet(pushIndex().load());
|
||||
}
|
||||
} // namespace thread
|
||||
} // namespace llarp
|
212
llarp/queue_manager.hpp
Normal file
212
llarp/queue_manager.hpp
Normal file
|
@ -0,0 +1,212 @@
|
|||
#ifndef LLARP_QUEUE_MANAGER_HPP
|
||||
#define LLARP_QUEUE_MANAGER_HPP
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace thread
|
||||
{
|
||||
enum class ElementState : uint32_t
|
||||
{
|
||||
Empty = 0,
|
||||
Writing = 1,
|
||||
Full = 2,
|
||||
Reading = 3
|
||||
};
|
||||
|
||||
enum class QueueReturn
|
||||
{
|
||||
Success,
|
||||
QueueDisabled,
|
||||
QueueEmpty,
|
||||
QueueFull
|
||||
};
|
||||
|
||||
inline std::ostream&
|
||||
operator<<(std::ostream& os, QueueReturn val)
|
||||
{
|
||||
switch(val)
|
||||
{
|
||||
case QueueReturn::Success:
|
||||
os << "Success";
|
||||
break;
|
||||
case QueueReturn::QueueDisabled:
|
||||
os << "QueueDisabled";
|
||||
break;
|
||||
case QueueReturn::QueueEmpty:
|
||||
os << "QueueEmpty";
|
||||
break;
|
||||
case QueueReturn::QueueFull:
|
||||
os << "QueueFull";
|
||||
break;
|
||||
}
|
||||
|
||||
return os;
|
||||
}
|
||||
|
||||
class QueueManager
|
||||
{
|
||||
// This class provides thread-safe state management for a queue.
|
||||
|
||||
// Common terminology in this class:
|
||||
// - "Combined Index": the combination of an index into the circular
|
||||
// buffer and the generation count. Precisely:
|
||||
//
|
||||
// Combined Index = (Generation * Capacity) + Element Index
|
||||
//
|
||||
// The combined index has the useful property where incrementing the
|
||||
// index when the element index is at the end of the buffer does two
|
||||
// things:
|
||||
// 1. Sets the element index back to 0
|
||||
// 2. Increments the generation
|
||||
|
||||
public:
|
||||
static constexpr size_t Alignment = 64;
|
||||
|
||||
using AtomicIndex = std::atomic_uint32_t;
|
||||
|
||||
private:
|
||||
AtomicIndex m_pushIndex; // Index in the buffer that the next
|
||||
// element will be added to.
|
||||
|
||||
char m_pushPadding[Alignment - sizeof(AtomicIndex)];
|
||||
|
||||
AtomicIndex m_popIndex; // Index in the buffer that the next
|
||||
// element will be removed from.
|
||||
|
||||
char m_popPadding[Alignment - sizeof(AtomicIndex)];
|
||||
|
||||
const size_t m_capacity; // max size of the manager.
|
||||
|
||||
const uint32_t m_maxGeneration; // Maximum generation for this object.
|
||||
|
||||
const uint32_t m_maxCombinedIndex; // Maximum combined value of index and
|
||||
// generation for this object.
|
||||
|
||||
std::atomic_uint32_t* m_states; // Array of index states.
|
||||
|
||||
AtomicIndex&
|
||||
pushIndex();
|
||||
|
||||
AtomicIndex&
|
||||
popIndex();
|
||||
|
||||
const AtomicIndex&
|
||||
pushIndex() const;
|
||||
|
||||
const AtomicIndex&
|
||||
popIndex() const;
|
||||
|
||||
// Return the next combined index
|
||||
uint32_t
|
||||
nextCombinedIndex(uint32_t index) const;
|
||||
|
||||
// Return the next generation
|
||||
uint32_t
|
||||
nextGeneration(uint32_t generation) const;
|
||||
|
||||
public:
|
||||
// Return the difference between the startingValue and the subtractValue
|
||||
// around a particular modulo.
|
||||
static int32_t
|
||||
circularDifference(uint32_t startingValue, uint32_t subtractValue,
|
||||
uint32_t modulo);
|
||||
|
||||
// Return the number of possible generations a circular buffer can hold.
|
||||
static uint32_t
|
||||
numGenerations(size_t capacity);
|
||||
|
||||
// The max capacity of the queue manager.
|
||||
// 2 bits are used for holding the disabled status and the number of
|
||||
// generations is at least 2.
|
||||
static constexpr size_t MAX_CAPACITY = 1 << ((sizeof(uint32_t) * 8) - 2);
|
||||
|
||||
explicit QueueManager(size_t capacity);
|
||||
|
||||
~QueueManager();
|
||||
|
||||
// Push operations
|
||||
|
||||
// Reserve the next available index to enqueue an element at. On success:
|
||||
// - Load `index` with the next available index
|
||||
// - Load `generation` with the current generation
|
||||
//
|
||||
// If this call succeeds, other threads may spin until `commitPushIndex`
|
||||
// is called.
|
||||
QueueReturn
|
||||
reservePushIndex(uint32_t& generation, uint32_t& index);
|
||||
|
||||
// Mark the `index` in the given `generation` as in-use. This unblocks
|
||||
// any other threads which were waiting on the index state.
|
||||
void
|
||||
commitPushIndex(uint32_t generation, uint32_t index);
|
||||
|
||||
// Pop operations
|
||||
|
||||
// Reserve the next available index to remove an element from. On success:
|
||||
// - Load `index` with the next available index
|
||||
// - Load `generation` with the current generation
|
||||
//
|
||||
// If this call succeeds, other threads may spin until `commitPopIndex`
|
||||
// is called.
|
||||
QueueReturn
|
||||
reservePopIndex(uint32_t& generation, uint32_t& index);
|
||||
|
||||
// Mark the `index` in the given `generation` as available. This unblocks
|
||||
// any other threads which were waiting on the index state.
|
||||
void
|
||||
commitPopIndex(uint32_t generation, uint32_t index);
|
||||
|
||||
// Disable the queue
|
||||
void
|
||||
disable();
|
||||
|
||||
// Enable the queue
|
||||
void
|
||||
enable();
|
||||
|
||||
// Exception safety
|
||||
|
||||
// If the next available index an element can be popped from is before
|
||||
// the `endGeneration` and the `endIndex`, reserve that index into `index`
|
||||
// and `generation`.
|
||||
//
|
||||
// Return true if an index was reserved and false otherwise.
|
||||
//
|
||||
// Behaviour is undefined if `endGeneration` and `endIndex` have not been
|
||||
// acquired for writing.
|
||||
//
|
||||
// The intended usage of this method is to help remove all elements if an
|
||||
// exception is thrown between reserving and committing an index.
|
||||
// Workflow:
|
||||
// 1. call reservePopForClear
|
||||
// 2. call commitPopIndex, emptying all cells up to the reserved index
|
||||
// 3. call abortPushIndexReservation on the index.
|
||||
bool
|
||||
reservePopForClear(uint32_t& generation, uint32_t& index,
|
||||
uint32_t endGeneration, uint32_t endIndex);
|
||||
|
||||
void
|
||||
abortPushIndexReservation(uint32_t generation, uint32_t index);
|
||||
|
||||
// Accessors
|
||||
|
||||
bool
|
||||
enabled() const;
|
||||
|
||||
size_t
|
||||
size() const;
|
||||
|
||||
size_t
|
||||
capacity() const;
|
||||
};
|
||||
} // namespace thread
|
||||
} // namespace llarp
|
||||
#endif
|
1125
test/test_llarp_queue_manager.cpp
Normal file
1125
test/test_llarp_queue_manager.cpp
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue