From d823d6fa703cbe97c4d16e3d61d8767d565cb627 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 28 Nov 2019 16:12:46 -0500 Subject: [PATCH] only flush when no other jobs are executing --- llarp/path/transit_hop.cpp | 82 +++++++++++++++++--------------------- llarp/path/transit_hop.hpp | 2 + 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/llarp/path/transit_hop.cpp b/llarp/path/transit_hop.cpp index fd1b370d9..9d493ccd2 100644 --- a/llarp/path/transit_hop.cpp +++ b/llarp/path/transit_hop.cpp @@ -36,6 +36,8 @@ namespace llarp { m_UpstreamGather.enable(); m_DownstreamGather.enable(); + m_UpstreamWorkCounter = 0; + m_DownstreamWorkCounter = 0; } bool @@ -122,6 +124,18 @@ namespace llarp void TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { + m_DownstreamWorkCounter++; + auto flushIt = [self = shared_from_this(), r]() { + std::vector< RelayDownstreamMessage > msgs; + do + { + auto maybe = self->m_DownstreamGather.tryPopFront(); + if(not maybe.has_value()) + break; + msgs.emplace_back(maybe.value()); + } while(true); + self->HandleAllDownstream(std::move(msgs), r); + }; for(auto& ev : *msgs) { RelayDownstreamMessage msg; @@ -132,27 +146,32 @@ namespace llarp msg.X = buf; llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ", info.upstream, " to ", info.downstream); - if(m_DownstreamGather.empty() || m_DownstreamGather.full()) + if(m_DownstreamGather.full()) { - LogicCall(r->logic(), [self = shared_from_this(), r]() { - std::vector< RelayDownstreamMessage > msgs; - do - { - auto maybe = self->m_DownstreamGather.tryPopFront(); - if(not maybe.has_value()) - break; - msgs.emplace_back(maybe.value()); - } while(true); - self->HandleAllDownstream(std::move(msgs), r); - }); + LogicCall(r->logic(), flushIt); } m_DownstreamGather.pushBack(msg); } + m_DownstreamWorkCounter--; + if(m_DownstreamWorkCounter == 0) + flushIt(); } void TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r) { + m_UpstreamWorkCounter++; + auto flushIt = [self = shared_from_this(), r]() { + std::vector< RelayUpstreamMessage > msgs; + do + { + auto maybe = self->m_UpstreamGather.tryPopFront(); + if(not maybe.has_value()) + break; + msgs.emplace_back(maybe.value()); + } while(true); + self->HandleAllUpstream(std::move(msgs), r); + }; for(auto& ev : *msgs) { const llarp_buffer_t buf(ev.first); @@ -161,22 +180,15 @@ namespace llarp msg.pathid = info.txID; msg.Y = ev.second ^ nonceXOR; msg.X = buf; - if(m_UpstreamGather.empty() || m_UpstreamGather.full()) + if(m_UpstreamGather.full()) { - LogicCall(r->logic(), [self = shared_from_this(), r]() { - std::vector< RelayUpstreamMessage > msgs; - do - { - auto maybe = self->m_UpstreamGather.tryPopFront(); - if(not maybe.has_value()) - break; - msgs.emplace_back(maybe.value()); - } while(true); - self->HandleAllUpstream(std::move(msgs), r); - }); + LogicCall(r->logic(), flushIt); } m_UpstreamGather.pushBack(msg); } + m_UpstreamWorkCounter--; + if(m_UpstreamWorkCounter == 0) + flushIt(); } void @@ -235,17 +247,6 @@ namespace llarp std::move(m_UpstreamQueue), r)); m_UpstreamQueue = nullptr; - std::vector< RelayUpstreamMessage > msgs; - do - { - auto maybe = m_UpstreamGather.tryPopFront(); - if(not maybe.has_value()) - break; - msgs.emplace_back(maybe.value()); - } while(true); - if(msgs.empty()) - return; - HandleAllUpstream(std::move(msgs), r); } void @@ -256,17 +257,6 @@ namespace llarp shared_from_this(), std::move(m_DownstreamQueue), r)); m_DownstreamQueue = nullptr; - std::vector< RelayDownstreamMessage > msgs; - do - { - auto maybe = m_DownstreamGather.tryPopFront(); - if(not maybe.has_value()) - break; - msgs.emplace_back(maybe.value()); - } while(true); - if(msgs.empty()) - return; - HandleAllDownstream(std::move(msgs), r); } bool diff --git a/llarp/path/transit_hop.hpp b/llarp/path/transit_hop.hpp index 3e9648526..1627bb24c 100644 --- a/llarp/path/transit_hop.hpp +++ b/llarp/path/transit_hop.hpp @@ -235,6 +235,8 @@ namespace llarp m_FlushOthers; thread::Queue< RelayUpstreamMessage > m_UpstreamGather; thread::Queue< RelayDownstreamMessage > m_DownstreamGather; + std::atomic< uint32_t > m_UpstreamWorkCounter; + std::atomic< uint32_t > m_DownstreamWorkCounter; }; inline std::ostream&