1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

Merge pull request #767 from majestrate/utp-write-split-2019-08-09

utp fixes
This commit is contained in:
Jeff 2019-08-12 09:08:58 -04:00 committed by GitHub
commit 25df36da2e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 21 deletions

View file

@ -302,7 +302,7 @@ namespace llarp
{ {
if(arg->state == UTP_STATE_WRITABLE) if(arg->state == UTP_STATE_WRITABLE)
{ {
session->PumpWrite(); session->Pump();
} }
else if(arg->state == UTP_STATE_EOF) else if(arg->state == UTP_STATE_EOF)
{ {

View file

@ -20,7 +20,7 @@ namespace llarp
/// pump tx queue /// pump tx queue
void void
Session::PumpWrite() Session::PumpWrite(size_t numSend)
{ {
if(!sock) if(!sock)
return; return;
@ -31,10 +31,11 @@ namespace llarp
{ {
for(const auto& vec : msg.vecs) for(const auto& vec : msg.vecs)
{ {
if(vec.iov_len) if(vec.iov_len > 0)
{ {
expect += vec.iov_len; expect += vec.iov_len;
send.emplace_back(vec); send.emplace_back(vec);
numSend--;
} }
} }
} }
@ -147,10 +148,15 @@ namespace llarp
Session::Tick(llarp_time_t now) Session::Tick(llarp_time_t now)
{ {
PruneInboundMessages(now); PruneInboundMessages(now);
m_TXRate = 0; // ensure that this section is called every 1s or so
m_RXRate = 0; if(now - m_LastTick >= 1000)
metrics::integerTick("utp.session.sendq", "size", sendq.size(), "id", {
RouterID(remoteRC.pubkey).ToString()); m_TXRate = 0;
m_RXRate = 0;
metrics::integerTick("utp.session.sendq", "size", sendq.size(), "id",
RouterID(remoteRC.pubkey).ToString());
m_LastTick = now;
}
} }
/// low level read /// low level read
@ -209,15 +215,29 @@ namespace llarp
return now - lastActive > 5000; return now - lastActive > 5000;
if(state == eSessionReady) if(state == eSessionReady)
{ {
// don't time out the connection if backlogged in downstream direction const bool remoteIsSNode = remoteRC.IsPublicRouter();
// for clients dangling off the side of the network const bool weAreSnode = parent->GetOurRC().IsPublicRouter();
const auto timestamp = const bool recvTimeout =
remoteRC.IsPublicRouter() && parent->GetOurRC().IsPublicRouter() (now > lastActive) && now - lastActive > sessionTimeout;
? lastSend const bool sendTimeout =
: lastActive; (now > lastSend) && now - lastSend > sessionTimeout;
if(now <= timestamp) if(remoteIsSNode && weAreSnode)
return false; {
return now - timestamp > 30000; /// for s2s connections only check write direction
return sendTimeout;
}
else if(weAreSnode)
{
// for edge connection as service node check either direction for
// timeout
return recvTimeout || sendTimeout;
}
else
{
/// for edge connections as client we check if both directions have
/// been silent for 60s
return recvTimeout && sendTimeout;
}
} }
if(state == eLinkEstablished) if(state == eLinkEstablished)
return now - lastActive return now - lastActive
@ -270,8 +290,10 @@ namespace llarp
void void
Session::Pump() Session::Pump()
{ {
// pump write queue if(sendq.size() > (MaxSendQueueSize / 4))
PumpWrite(); PumpWrite(sendq.size() / 2);
else
PumpWrite(sendq.size());
// prune inbound messages // prune inbound messages
PruneInboundMessages(parent->Now()); PruneInboundMessages(parent->Now());
} }
@ -284,7 +306,7 @@ namespace llarp
if(SendQueueBacklog() >= MaxSendQueueSize) if(SendQueueBacklog() >= MaxSendQueueSize)
{ {
// pump write queue if we seem to be full // pump write queue if we seem to be full
PumpWrite(); PumpWrite(MaxSendQueueSize / 2);
} }
if(SendQueueBacklog() >= MaxSendQueueSize) if(SendQueueBacklog() >= MaxSendQueueSize)
{ {
@ -310,7 +332,7 @@ namespace llarp
sz -= s; sz -= s;
} }
if(state != eSessionReady) if(state != eSessionReady)
PumpWrite(); PumpWrite(sendq.size());
return true; return true;
} }

View file

@ -101,6 +101,8 @@ namespace llarp
uint64_t m_RXRate = 0; uint64_t m_RXRate = 0;
uint64_t m_TXRate = 0; uint64_t m_TXRate = 0;
llarp_time_t m_LastTick = 0;
/// mark session as alive /// mark session as alive
void void
Alive(); Alive();
@ -147,7 +149,7 @@ namespace llarp
/// pump tx queue /// pump tx queue
void void
PumpWrite(); PumpWrite(size_t numMessages);
void void
Pump() override; Pump() override;