Convert to use abseil synchronisation primitives

This commit is contained in:
Michael 2019-03-03 15:01:05 +00:00
parent 0a44d1b730
commit c5a129ddff
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C
20 changed files with 166 additions and 259 deletions

View File

@ -186,7 +186,8 @@ endif(SHADOW)
if(CMAKE_BUILD_TYPE MATCHES "[Dd][Ee][Bb][Uu][Gg]")
set(OPTIMIZE_FLAGS "")
add_compile_options( ${DEBUG_FLAGS} )
add_compile_options( ${DEBUG_FLAGS})
add_definitions(-DLOKINET_DEBUG=1)
link_libraries( ${DEBUG_FLAGS} )
endif(CMAKE_BUILD_TYPE MATCHES "[Dd][Ee][Bb][Uu][Gg]")

View File

@ -100,6 +100,10 @@ main(int argc, char *argv[])
SetConsoleCtrlHandler(handle_signal_win32, TRUE);
#endif
#ifdef LOKINET_DEBUG
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
int opt = 0;
bool genconfigOnly = false;
bool asRouter = false;

View File

@ -257,6 +257,10 @@ main(int argc, char *argv[])
return 1;
}
#ifdef LOKINET_DEBUG
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
llarp::RouterContact tmp;
if(verifyMode)

View File

@ -1,6 +1,8 @@
#include <libabyss.hpp>
#include <net/net.hpp>
#include <absl/synchronization/mutex.h>
#ifndef _WIN32
#include <sys/signal.h>
#endif
@ -120,6 +122,10 @@ main(__attribute__((unused)) int argc, __attribute__((unused)) char* argv[])
return err;
}
#endif
#ifdef LOKINET_DEBUG
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
llarp::SetLogLevel(llarp::eLogDebug);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop* loop = nullptr;

View File

@ -33,8 +33,8 @@ set(LIB_UTIL_SRC
)
add_library(${UTIL_LIB} STATIC ${LIB_UTIL_SRC})
target_include_directories(${UTIL_LIB} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(${UTIL_LIB} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_include_directories(${UTIL_LIB} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(${UTIL_LIB} PUBLIC absl::synchronization)
# cut back on fluff
if (NOT WIN32)

View File

@ -170,8 +170,8 @@ namespace llarp
using Pkt_t = net::IPv4Packet;
using PacketQueue_t =
util::CoDelQueue< Pkt_t, Pkt_t::GetTime, Pkt_t::PutTime,
Pkt_t::CompareOrder, Pkt_t::GetNow,
util::DummyMutex, util::DummyLock, 5, 100, 1024 >;
Pkt_t::CompareOrder, Pkt_t::GetNow, util::NullMutex,
util::NullLock, 5, 100, 1024 >;
/// internet to llarp packet queue
PacketQueue_t m_InetToNetwork;

View File

@ -28,7 +28,7 @@ namespace llarp
bool
ILinkLayer::HasSessionTo(const RouterID& id)
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
return m_AuthedLinks.find(id) != m_AuthedLinks.end();
}
@ -91,7 +91,7 @@ namespace llarp
{
auto _now = Now();
{
Lock lock(m_AuthedLinksMutex);
Lock lock(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -109,7 +109,7 @@ namespace llarp
}
}
{
Lock lock(m_PendingMutex);
Lock lock(&m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
@ -129,8 +129,8 @@ namespace llarp
ILinkLayer::MapAddr(const RouterID& pk, ILinkSession* s)
{
static constexpr size_t MaxSessionsPerKey = 16;
Lock l_authed(m_AuthedLinksMutex);
Lock l_pending(m_PendingMutex);
Lock l_authed(&m_AuthedLinksMutex);
Lock l_pending(&m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
if(itr != m_Pending.end())
@ -223,7 +223,7 @@ namespace llarp
void
ILinkLayer::Tick(llarp_time_t now)
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -238,7 +238,7 @@ namespace llarp
if(m_Logic && tick_id)
m_Logic->remove_call(tick_id);
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -247,7 +247,7 @@ namespace llarp
}
}
{
Lock l(m_PendingMutex);
Lock l(&m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
@ -260,7 +260,7 @@ namespace llarp
void
ILinkLayer::CloseSessionTo(const RouterID& remote)
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto range = m_AuthedLinks.equal_range(r);
@ -275,7 +275,7 @@ namespace llarp
void
ILinkLayer::KeepAliveSessionTo(const RouterID& remote)
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
while(itr != range.second)
@ -290,7 +290,7 @@ namespace llarp
{
ILinkSession* s = nullptr;
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
// pick lowest backlog session
@ -365,7 +365,7 @@ namespace llarp
bool
ILinkLayer::PutSession(ILinkSession* s)
{
Lock lock(m_PendingMutex);
Lock lock(&m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
if(itr != m_Pending.end())

View File

@ -487,7 +487,7 @@ namespace llarp
#endif
std::set< RouterID > sessions;
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -497,7 +497,7 @@ namespace llarp
}
ILinkLayer::Pump();
{
Lock l(m_AuthedLinksMutex);
Lock l(&m_AuthedLinksMutex);
for(const auto& pk : sessions)
{
if(m_AuthedLinks.count(pk) == 0)

View File

@ -18,7 +18,7 @@ static const std::string RC_FILE_EXT = ".signed";
bool
llarp_nodedb::Remove(const llarp::RouterID &pk)
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
auto itr = entries.find(pk);
if(itr == entries.end())
return false;
@ -30,7 +30,7 @@ llarp_nodedb::Remove(const llarp::RouterID &pk)
void
llarp_nodedb::Clear()
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
entries.clear();
}
@ -38,9 +38,11 @@ bool
llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result,
bool lock)
{
std::unique_ptr< llarp::util::Lock > l;
absl::optional< llarp::util::Lock > l;
if(lock)
l.reset(new llarp::util::Lock(access));
{
l.emplace(&access);
}
auto itr = entries.find(pk);
if(itr == entries.end())
return false;
@ -51,7 +53,7 @@ llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result,
bool
llarp_nodedb::Has(const llarp::RouterID &pk)
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
return entries.find(pk) != entries.end();
}
@ -108,7 +110,7 @@ llarp_nodedb::Insert(const llarp::RouterContact &rc)
std::array< byte_t, MAX_RC_SIZE > tmp;
llarp_buffer_t buf(tmp);
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
auto itr = entries.find(rc.pubkey.as_array());
if(itr != entries.end())
entries.erase(itr);
@ -188,7 +190,7 @@ llarp_nodedb::loadfile(const fs::path &fpath)
return false;
}
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
entries.emplace(rc.pubkey.as_array(), rc);
}
return true;
@ -197,7 +199,7 @@ llarp_nodedb::loadfile(const fs::path &fpath)
void
llarp_nodedb::visit(std::function< bool(const llarp::RouterContact &) > visit)
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
auto itr = entries.begin();
while(itr != entries.end())
{
@ -211,7 +213,7 @@ bool
llarp_nodedb::iterate(llarp_nodedb_iter &i)
{
i.index = 0;
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
auto itr = entries.begin();
while(itr != entries.end())
{
@ -394,7 +396,7 @@ llarp_nodedb::num_loaded() const
bool
llarp_nodedb::select_random_exit(llarp::RouterContact &result)
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
const auto sz = entries.size();
auto itr = entries.begin();
if(sz < 3)
@ -429,7 +431,7 @@ bool
llarp_nodedb::select_random_hop(const llarp::RouterContact &prev,
llarp::RouterContact &result, size_t N)
{
llarp::util::Lock lock(access);
llarp::util::Lock lock(&access);
/// checking for "guard" status for N = 0 is done by caller inside of
/// pathbuilder's scope
size_t sz = entries.size();

View File

@ -96,7 +96,7 @@ namespace llarp
IHopHandler*
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
util::Lock lock(map.first);
util::Lock lock(&map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -110,7 +110,7 @@ namespace llarp
bool
MapHas(Map_t& map, const Key_t& k, CheckValue_t check)
{
util::Lock lock(map.first);
util::Lock lock(&map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -124,7 +124,7 @@ namespace llarp
void
MapPut(Map_t& map, const Key_t& k, const Value_t& v)
{
util::Lock lock(map.first);
util::Lock lock(&map.first);
map.second.emplace(k, v);
}
@ -172,31 +172,33 @@ namespace llarp
IHopHandler*
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{
auto own = MapGet(m_OurPaths, id,
[](__attribute__((unused)) const PathSet* s) -> bool {
// TODO: is this right?
return true;
},
[remote, id](PathSet* p) -> IHopHandler* {
return p->GetByUpstream(remote, id);
});
auto own = MapGet(
m_OurPaths, id,
[](__attribute__((unused)) const PathSet* s) -> bool {
// TODO: is this right?
return true;
},
[remote, id](PathSet* p) -> IHopHandler* {
return p->GetByUpstream(remote, id);
});
if(own)
return own;
return MapGet(m_TransitPaths, id,
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.upstream == remote;
},
[](const std::shared_ptr< TransitHop >& h) -> IHopHandler* {
return h.get();
});
return MapGet(
m_TransitPaths, id,
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.upstream == remote;
},
[](const std::shared_ptr< TransitHop >& h) -> IHopHandler* {
return h.get();
});
}
bool
PathContext::TransitHopPreviousIsRouter(const PathID_t& path,
const RouterID& otherRouter)
{
util::Lock lock(m_TransitPaths.first);
util::Lock lock(&m_TransitPaths.first);
auto itr = m_TransitPaths.second.find(path);
if(itr == m_TransitPaths.second.end())
return false;
@ -206,20 +208,21 @@ namespace llarp
IHopHandler*
PathContext::GetByDownstream(const RouterID& remote, const PathID_t& id)
{
return MapGet(m_TransitPaths, id,
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.downstream == remote;
},
[](const std::shared_ptr< TransitHop >& h) -> IHopHandler* {
return h.get();
});
return MapGet(
m_TransitPaths, id,
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.downstream == remote;
},
[](const std::shared_ptr< TransitHop >& h) -> IHopHandler* {
return h.get();
});
}
PathSet*
PathContext::GetLocalPathSet(const PathID_t& id)
{
auto& map = m_OurPaths;
util::Lock lock(map.first);
util::Lock lock(&map.first);
auto itr = map.second.find(id);
if(itr != map.second.end())
{
@ -246,7 +249,7 @@ namespace llarp
RouterID us(OurRouterID());
auto& map = m_TransitPaths;
{
util::Lock lock(map.first);
util::Lock lock(&map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
@ -267,7 +270,7 @@ namespace llarp
void
PathContext::ExpirePaths(llarp_time_t now)
{
util::Lock lock(m_TransitPaths.first);
util::Lock lock(&m_TransitPaths.first);
auto& map = m_TransitPaths.second;
auto itr = map.begin();
while(itr != map.end())
@ -320,7 +323,7 @@ namespace llarp
RouterID us(OurRouterID());
auto& map = m_TransitPaths;
{
util::Lock lock(map.first);
util::Lock lock(&map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
@ -340,7 +343,7 @@ namespace llarp
void
PathContext::RemovePathSet(PathSet* set)
{
util::Lock lock(m_OurPaths.first);
util::Lock lock(&m_OurPaths.first);
auto& map = m_OurPaths.second;
auto itr = map.begin();
while(itr != map.end())

View File

@ -16,14 +16,14 @@ namespace llarp
PathSet::ShouldBuildMore(llarp_time_t now) const
{
(void)now;
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
return m_Paths.size() < m_NumPaths;
}
bool
PathSet::ShouldBuildMoreForRoles(llarp_time_t now, PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
const size_t required = MinRequiredForRoles(roles);
size_t has = 0;
for(const auto& item : m_Paths)
@ -48,7 +48,7 @@ namespace llarp
PathSet::NumPathsExistingAt(llarp_time_t futureTime) const
{
size_t num = 0;
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
for(const auto& item : m_Paths)
{
if(!item.second->Expired(futureTime))
@ -60,7 +60,7 @@ namespace llarp
void
PathSet::Tick(llarp_time_t now, AbstractRouter* r)
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
for(auto& item : m_Paths)
{
item.second->Tick(now, r);
@ -70,7 +70,7 @@ namespace llarp
void
PathSet::ExpirePaths(llarp_time_t now)
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
if(m_Paths.size() == 0)
return;
auto itr = m_Paths.begin();
@ -89,7 +89,7 @@ namespace llarp
Path*
PathSet::GetEstablishedPathClosestTo(RouterID id, PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
Path* path = nullptr;
AlignedBuffer< 32 > dist;
AlignedBuffer< 32 > to = id;
@ -113,7 +113,7 @@ namespace llarp
Path*
PathSet::GetNewestPathByRouter(RouterID id, PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
Path* chosen = nullptr;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -136,7 +136,7 @@ namespace llarp
Path*
PathSet::GetPathByRouter(RouterID id, PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
Path* chosen = nullptr;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -159,7 +159,7 @@ namespace llarp
Path*
PathSet::GetPathByID(PathID_t id) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -173,7 +173,7 @@ namespace llarp
size_t
PathSet::AvailablePaths(PathRole roles) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
size_t count = 0;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -189,7 +189,7 @@ namespace llarp
size_t
PathSet::NumInStatus(PathStatus st) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
size_t count = 0;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -204,7 +204,7 @@ namespace llarp
void
PathSet::AddPath(Path* path)
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto upstream = path->Upstream(); // RouterID
auto RXID = path->RXID(); // PathID
m_Paths.emplace(std::make_pair(upstream, RXID), path);
@ -213,14 +213,14 @@ namespace llarp
void
PathSet::RemovePath(Path* path)
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
m_Paths.erase({path->Upstream(), path->RXID()});
}
Path*
PathSet::GetByUpstream(RouterID remote, PathID_t rxid) const
{
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto itr = m_Paths.find({remote, rxid});
if(itr == m_Paths.end())
return nullptr;
@ -239,7 +239,7 @@ namespace llarp
{
intros.clear();
size_t count = 0;
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -259,7 +259,7 @@ namespace llarp
{
intros.clear();
size_t count = 0;
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -284,7 +284,7 @@ namespace llarp
{
intro.Clear();
bool found = false;
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -303,7 +303,7 @@ namespace llarp
PathSet::PickRandomEstablishedPath(PathRole roles) const
{
std::vector< Path* > established;
Lock_t l(m_PathsMutex);
Lock_t l(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{

View File

@ -199,7 +199,7 @@ namespace llarp
void
ForEachPath(std::function< void(Path*) > visit)
{
Lock_t lock(m_PathsMutex);
Lock_t lock(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -211,7 +211,7 @@ namespace llarp
void
ForEachPath(std::function< void(const Path*) > visit) const
{
Lock_t lock(m_PathsMutex);
Lock_t lock(&m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{

View File

@ -52,7 +52,7 @@ namespace llarp
bool
Profiling::IsBad(const RouterID& r, uint64_t chances)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
auto itr = m_Profiles.find(r);
if(itr == m_Profiles.end())
return false;
@ -62,21 +62,21 @@ namespace llarp
void
Profiling::MarkTimeout(const RouterID& r)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
m_Profiles[r].connectTimeoutCount += 1;
}
void
Profiling::MarkSuccess(const RouterID& r)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
m_Profiles[r].connectGoodCount += 1;
}
void
Profiling::MarkPathFail(path::Path* p)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
for(const auto& hop : p->hops)
{
// TODO: also mark bad?
@ -87,7 +87,7 @@ namespace llarp
void
Profiling::MarkPathSuccess(path::Path* p)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
for(const auto& hop : p->hops)
{
m_Profiles[hop.rc.pubkey].pathSuccessCount += 1;
@ -97,7 +97,7 @@ namespace llarp
bool
Profiling::Save(const char* fname)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
size_t sz = (m_Profiles.size() * (RouterProfile::MaxSize + 32 + 8)) + 8;
std::vector< byte_t > tmp(sz, 0);
@ -148,7 +148,7 @@ namespace llarp
bool
Profiling::Load(const char* fname)
{
lock_t lock(m_ProfilesMutex);
lock_t lock(&m_ProfilesMutex);
m_Profiles.clear();
if(!BDecodeReadFile(fname, *this))
{

View File

@ -16,19 +16,6 @@ namespace llarp
{
namespace util
{
struct DummyMutex
{
};
struct DummyLock
{
DummyLock(__attribute__((unused)) const DummyMutex& mtx){};
~DummyLock()
{
}
};
struct GetNowSyscall
{
llarp_time_t
@ -60,7 +47,7 @@ namespace llarp
bool
EmplaceIf(std::function< bool(T&) > pred, Args&&... args)
{
Lock_t lock(m_QueueMutex);
Lock_t lock(&m_QueueMutex);
if(m_QueueIdx == MaxSize)
return false;
T* t = &m_Queue[m_QueueIdx];
@ -83,7 +70,7 @@ namespace llarp
void
Emplace(Args&&... args)
{
Lock_t lock(m_QueueMutex);
Lock_t lock(&m_QueueMutex);
if(m_QueueIdx == MaxSize)
return;
T* t = &m_Queue[m_QueueIdx];
@ -109,7 +96,7 @@ namespace llarp
if(_getNow() < nextTickAt)
return;
// llarp::LogInfo("CoDelQueue::Process - start at ", start);
Lock_t lock(m_QueueMutex);
Lock_t lock(&m_QueueMutex);
auto start = firstPut;
if(m_QueueIdx == 1)

View File

@ -1,6 +1,8 @@
#ifndef LLARP_THREADING_HPP
#define LLARP_THREADING_HPP
#include <mutex>
#include <absl/synchronization/barrier.h>
#include <absl/synchronization/mutex.h>
// We only support posix threads:
// MSYS2 has a full native C++11 toolset, and a suitable
// cross-compilation system can be assembled on Linux and UNIX
@ -24,70 +26,20 @@ namespace llarp
/// a lock that does nothing
struct NullLock
{
NullLock(__attribute__((unused)) NullMutex& mtx)
NullLock(__attribute__((unused)) NullMutex* mtx)
{
}
};
using mtx_t = std::mutex;
using lock_t = std::unique_lock< std::mutex >;
using cond_t = std::condition_variable;
struct Mutex
{
mtx_t impl;
};
/// aqcuire a lock on a mutex
struct Lock
{
Lock(Mutex& mtx) : impl(mtx.impl)
{
}
lock_t impl;
};
struct Condition
{
cond_t impl;
void
NotifyAll()
{
impl.notify_all();
}
void
NotifyOne()
{
impl.notify_one();
}
void
Wait(Lock& lock)
{
impl.wait(lock.impl);
}
template < typename Interval >
void
WaitFor(Lock& lock, Interval i)
{
impl.wait_for(lock.impl, i);
}
template < typename Pred >
void
WaitUntil(Lock& lock, Pred p)
{
impl.wait(lock.impl, p);
}
};
using Mutex = absl::Mutex;
using Lock = absl::MutexLock;
using Condition = absl::CondVar;
class Semaphore
{
private:
std::mutex m_mutex;
std::condition_variable m_cv;
Mutex m_mutex;
Condition m_cv;
size_t m_count;
public:
@ -98,100 +50,44 @@ namespace llarp
void
notify()
{
std::unique_lock< std::mutex > lock(m_mutex);
Lock lock(&m_mutex);
m_count++;
m_cv.notify_one();
m_cv.Signal();
}
void
wait()
{
std::unique_lock< std::mutex > lock(m_mutex);
m_cv.wait(lock, [this]() { return this->m_count > 0; });
Lock lock(&m_mutex);
while(this->m_count == 0)
{
m_cv.Wait(&m_mutex);
}
m_count--;
}
template < typename Rep, typename Period >
bool
waitFor(const std::chrono::duration< Rep, Period >& period)
waitFor(absl::Duration timeout)
{
std::unique_lock< std::mutex > lock(m_mutex);
Lock lock(&m_mutex);
if(m_cv.wait_for(lock, period, [this]() { return this->m_count > 0; }))
{
m_count--;
return true;
}
return false;
}
};
class Barrier
{
private:
std::mutex mutex;
std::condition_variable cv;
const size_t numThreads;
size_t numThreadsWaiting; // number of threads to be woken
size_t sigCount; // number of times the barrier has been signalled
size_t numPending; // number of threads that have been signalled, but
// haven't woken.
public:
Barrier(size_t threadCount)
: numThreads(threadCount)
, numThreadsWaiting(0)
, sigCount(0)
, numPending(0)
{
}
~Barrier()
{
for(;;)
while(this->m_count == 0)
{
if(m_cv.WaitWithTimeout(&m_mutex, timeout))
{
std::unique_lock< std::mutex > lock(mutex);
if(numPending == 0)
{
break;
}
return false;
}
std::this_thread::yield();
}
assert(numThreadsWaiting == 0);
}
void
wait()
{
std::unique_lock< std::mutex > lock(mutex);
size_t signalCount = sigCount;
if(++numThreadsWaiting == numThreads)
{
++sigCount;
numPending += numThreads - 1;
numThreadsWaiting = 0;
cv.notify_all();
}
else
{
cv.wait(lock, [this, signalCount]() {
return this->sigCount != signalCount;
});
--numPending;
}
m_count--;
return true;
}
};
using Barrier = absl::Barrier;
} // namespace util
} // namespace llarp

View File

@ -62,7 +62,7 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
else
{
// single threaded mode
llarp::util::Lock lock(pool->m_access);
llarp::util::Lock lock(&pool->m_access);
pool->jobs.emplace(std::bind(job.work, job.user));
}
}
@ -74,7 +74,7 @@ llarp_threadpool_tick(struct llarp_threadpool *pool)
{
std::function< void(void) > job;
{
llarp::util::Lock lock(pool->m_access);
llarp::util::Lock lock(&pool->m_access);
job = std::move(pool->jobs.front());
pool->jobs.pop();
}

View File

@ -61,7 +61,7 @@ struct llarp_timer_context
std::priority_queue< std::unique_ptr< llarp::timer > > calling;
llarp::util::Mutex tickerMutex;
std::unique_ptr< llarp::util::Condition > ticker;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100);
absl::Duration nextTickLen = absl::Milliseconds(100);
llarp_time_t m_Now;
@ -92,7 +92,7 @@ struct llarp_timer_context
void
cancel(uint32_t id)
{
llarp::util::Lock lock(timersMutex);
llarp::util::Lock lock(&timersMutex);
const auto& itr = timers.find(id);
if(itr == timers.end())
return;
@ -102,7 +102,7 @@ struct llarp_timer_context
void
remove(uint32_t id)
{
llarp::util::Lock lock(timersMutex);
llarp::util::Lock lock(&timersMutex);
const auto& itr = timers.find(id);
if(itr == timers.end())
return;
@ -113,7 +113,7 @@ struct llarp_timer_context
uint32_t
call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms)
{
llarp::util::Lock lock(timersMutex);
llarp::util::Lock lock(&timersMutex);
uint32_t id = ++ids;
timers.emplace(
@ -127,7 +127,7 @@ struct llarp_timer_context
std::list< uint32_t > ids;
{
llarp::util::Lock lock(timersMutex);
llarp::util::Lock lock(&timersMutex);
for(auto& item : timers)
{
@ -177,7 +177,7 @@ llarp_timer_stop(struct llarp_timer_context* t)
t->timers.clear();
t->stop();
if(t->ticker)
t->ticker->NotifyAll();
t->ticker->SignalAll();
}
void
@ -202,7 +202,7 @@ llarp_timer_tick_all(struct llarp_timer_context* t)
std::list< std::unique_ptr< llarp::timer > > hit;
{
llarp::util::Lock lock(t->timersMutex);
llarp::util::Lock lock(&t->timersMutex);
auto itr = t->timers.begin();
while(itr != t->timers.end())
{
@ -250,13 +250,13 @@ llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool)
// wait for timer mutex
if(t->ticker)
{
llarp::util::Lock lock(t->tickerMutex);
t->ticker->WaitFor(lock, t->nextTickLen);
llarp::util::Lock lock(&t->tickerMutex);
t->ticker->WaitWithTimeout(&t->tickerMutex, t->nextTickLen);
}
if(t->run())
{
llarp::util::Lock lock(t->timersMutex);
llarp::util::Lock lock(&t->timersMutex);
// we woke up
llarp_timer_tick_all_async(t, pool, llarp::time_now_ms());
}

View File

@ -1,5 +1,7 @@
#include <gtest/gtest.h>
#include <absl/synchronization/mutex.h>
#ifdef _WIN32
#include <winsock2.h>
int
@ -24,10 +26,12 @@ main(int argc, char** argv)
if(startWinsock())
return -1;
#endif
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
::testing::InitGoogleTest(&argc, argv);
int r = RUN_ALL_TESTS();
#ifdef _WIN32
WSACleanup();
#endif
return r;
}
}

View File

@ -120,7 +120,7 @@ void
abaThread(char* firstValue, char* lastValue, Queue< char* >& queue,
util::Barrier& barrier)
{
barrier.wait();
barrier.Block();
for(char* val = firstValue; val <= lastValue; ++val)
{
@ -171,7 +171,7 @@ sleepNWait(size_t microseconds, util::Barrier& barrier)
std::this_thread::sleep_for(
std::chrono::duration< double, std::micro >(microseconds));
barrier.wait();
barrier.Block();
}
void
@ -505,14 +505,14 @@ TEST(TestQueue, exceptionSafety)
ASSERT_THROW({ (void)queue.popFront(); }, Exception);
// Now the queue is not full, and the producer thread can start adding items.
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_TRUE(semaphore.waitFor(absl::Seconds(1)));
ASSERT_EQ(queueSize, queue.size());
ASSERT_THROW({ (void)queue.popFront(); }, Exception);
// Now the queue is not full, and the producer thread can start adding items.
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_TRUE(semaphore.waitFor(absl::Seconds(1)));
ASSERT_EQ(queueSize, queue.size());
@ -522,7 +522,7 @@ TEST(TestQueue, exceptionSafety)
// pop an item to unblock the pusher
(void)queue.popFront();
ASSERT_TRUE(semaphore.waitFor(std::chrono::seconds{1}));
ASSERT_TRUE(semaphore.waitFor(absl::Seconds(1)));
ASSERT_EQ(1u, caught);

View File

@ -57,9 +57,9 @@ incrementFunction(PoolArgs& args)
void
barrierFunction(BarrierArgs& args)
{
args.startBarrier.wait();
args.startBarrier.Block();
args.count++;
args.stopBarrier.wait();
args.stopBarrier.Block();
}
void
@ -82,7 +82,7 @@ recurse(util::Barrier& barrier, std::atomic_size_t& counter, ThreadPool& pool,
std::ref(pool), depthLimit)));
}
barrier.wait();
barrier.Block();
}
class DestructiveObject
@ -98,7 +98,7 @@ class DestructiveObject
~DestructiveObject()
{
auto job = std::bind(&util::Barrier::wait, &barrier);
auto job = std::bind(&util::Barrier::Block, &barrier);
pool.addJob(job);
}
};
@ -384,7 +384,7 @@ TEST_P(TryAdd, noblocking)
}
// Wait for everything to start.
startBarrier.wait();
startBarrier.Block();
// and that we emptied the queue.
ASSERT_EQ(0u, pool.jobCount());
@ -402,7 +402,7 @@ TEST_P(TryAdd, noblocking)
ASSERT_FALSE(pool.tryAddJob(workJob));
// and finish
stopBarrier.wait();
stopBarrier.Block();
}
TEST(TestThreadPool, recurseJob)
@ -424,7 +424,7 @@ TEST(TestThreadPool, recurseJob)
ASSERT_TRUE(pool.addJob(std::bind(recurse, std::ref(barrier),
std::ref(counter), std::ref(pool), depth)));
barrier.wait();
barrier.Block();
ASSERT_EQ(depth, counter);
}
@ -447,5 +447,5 @@ TEST(TestThreadPool, destructors)
ASSERT_TRUE(pool.addJob(std::bind(destructiveJob, obj)));
}
barrier.wait();
barrier.Block();
}