1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

move llarp::Logic to std::shared_ptr

add sequence numbers to HSD messages

begin work on network isolation code

add more docs
This commit is contained in:
Jeff Becker 2019-05-22 12:20:03 -04:00
parent 28623766c1
commit 9c96aecf3f
No known key found for this signature in database
GPG key ID: F357B3B42F6F9B05
39 changed files with 301 additions and 204 deletions

View file

@ -6,6 +6,12 @@ The distance function is A xor B (traditional kademlia)
The dht implements both iterative and recursive lookups.
Recursive lookups forward the request to a node closer if not found.
Iterative lookups return the key and of the DHT node who is closer.
In the case of iterative FRCM the RC of the closer router is provided.
In the case of iterative FIM the pubkey of a dht node who is closer is provided in the GIM.
find introduction message (FIM)
variant 1: find an IS by SA
@ -29,20 +35,13 @@ variant 2: recursively find many IS in a tag
V: 0
}
variant 3: find many IS by what services they advertise
{
A: "F",
H:
R: 0 for iterative and 1 for recurisve,
}
exclude adding service addresses in E if present
got introduction message (GIM)
sent in reply to FIM and PIM
sent in reply to FIM containing the result of the search
sent in reply to PIM to acknoledge the publishing of an IS
{
A: "G",
@ -53,7 +52,7 @@ sent in reply to FIM and PIM
}
The I value MUST NOT contain more than 4 IS.
The I value contain either 1 or 0 IS for PIM and FIM variant 1.
The I value MUST contain either 1 or 0 IS for PIM and FIM variant 1.
publish introduction message (PIM)
@ -71,18 +70,17 @@ R is currently set to 0 by the sender.
{
A: "I",
I: IS,
R: r_counter,
R: random_walk_counter,
S: optional member 0 for immediate store otherwise non zero,
T: transaction_id_uint64,
V: 0
}
if R is greater than 0, do a random walk, otherwise if we are
closer to the SA of the IS than anyone else we know in the DHT
store the IS for later lookup.
if R is greater than 0, decrement R and forward the request to a random DHT
node without storing the IS.
As of protocol version 0, R is always 0.
If S is provided store the IS for later lookup unconditionally,
If S is provided store the IS for later lookup unconditionally, then
decrement S by 1 and forward to dht peer who is next closest to
the SA of the IS. If S is greater than 3, don't store the IS and
discard this message.
@ -101,7 +99,15 @@ find a router by long term RC.k public key
V: 0
}
TODO: document me
if E is provided and non zero then return E dht nodes that are closest to K
if I is provided and non zero then this request is considered an iterative lookup
during an iterative lookup the response's GRCM.K is set to the pubkey of the router closer in key space.
during a recursive lookup the request is forwarded to a router who is closer in
keyspace to K.
If we have no peers that are closer to K and we don't have the RC known locally
we reply with a GRCM whose R value is emtpy.
In any case if we have a locally known RC with pubkey equal to K reply with
a GRCM with 1 RC in the R value corrisponding to that locally known RC.
got router contact message (GRCM)
@ -116,12 +122,11 @@ sent in reply to FRCM only
V: 0
}
in response to an exploritory FRCM if the target router is not found the form
in response to an exploritory router lookup, where FRCM.E is provided and non zero.
{
A: "S",
N: [list, of, router, publickeys, near, target],
R: [],
N: [list, of, router, publickeys, near, K],
T: transaction_id_uint64,
V: 0
}

View file

@ -682,7 +682,6 @@ generate a new convotag that is contained inside an encrypted HSD
D: "<N bytes encrypted HSD>",
F: "<16 bytes source path_id>",
N: "<32 bytes nonce for key exchange>",
S: sequence_number,
V: 0,
Z: "<64 bytes signature of entire message using sender's signing key>"
}
@ -782,6 +781,18 @@ transfer data on a converstation previously made
Z: "<64 bytes signature using sender's signing key>"
}
reject a message sent on a convo tag, when a remote endpoint
sends this message a new converstation SHOULD be established.
{
A: "H",
F: "<16 bytes path id of soruce>",
R: 1,
T: "<16 bytes converstation tag>",
V: 0,
Z: "<64 bytes signature using sender's signing key>"
}
transfer ip traffic message (TITM)

View file

@ -57,7 +57,7 @@ namespace llarp
std::unique_ptr< Crypto > crypto;
std::unique_ptr< AbstractRouter > router;
std::unique_ptr< llarp_threadpool > worker;
std::unique_ptr< Logic > logic;
std::shared_ptr< Logic > logic;
std::unique_ptr< Config > config;
std::unique_ptr< llarp_nodedb > nodedb;
llarp_ev_loop_ptr mainloop;

View file

@ -45,7 +45,7 @@ namespace abyss
virtual ~BaseReqHandler();
bool
ServeAsync(llarp_ev_loop_ptr loop, llarp::Logic* logic,
ServeAsync(llarp_ev_loop_ptr loop, std::shared_ptr<llarp::Logic> logic,
const sockaddr* bindaddr);
void
@ -76,7 +76,7 @@ namespace abyss
OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*);
llarp_ev_loop_ptr m_loop;
llarp::Logic* m_Logic;
std::shared_ptr<llarp::Logic> m_Logic;
llarp_tcp_acceptor m_acceptor;
std::list< std::unique_ptr< IRPCHandler > > m_Conns;
llarp_time_t m_ReqTimeout;

View file

@ -25,9 +25,9 @@ struct DemoHandler : public abyss::httpd::IRPCHandler
struct DemoCall : public abyss::http::IRPCClientHandler
{
std::function< void(void) > m_Callback;
llarp::Logic* m_Logic;
std::shared_ptr<llarp::Logic> m_Logic;
DemoCall(abyss::http::ConnImpl* impl, llarp::Logic* logic,
DemoCall(abyss::http::ConnImpl* impl, std::shared_ptr<llarp::Logic> logic,
std::function< void(void) > callback)
: abyss::http::IRPCClientHandler(impl)
, m_Callback(callback)
@ -36,16 +36,10 @@ struct DemoCall : public abyss::http::IRPCClientHandler
llarp::LogInfo("new call");
}
static void
CallCallback(void* u)
{
static_cast< DemoCall* >(u)->m_Callback();
}
bool HandleResponse(abyss::http::RPC_Response) override
{
llarp::LogInfo("response get");
m_Logic->queue_job({this, &CallCallback});
m_Logic->queue_func(m_Callback);
return true;
}
@ -64,9 +58,9 @@ struct DemoCall : public abyss::http::IRPCClientHandler
struct DemoClient : public abyss::http::JSONRPC
{
llarp_ev_loop_ptr m_Loop;
llarp::Logic* m_Logic;
std::shared_ptr<llarp::Logic> m_Logic;
DemoClient(llarp_ev_loop_ptr l, llarp::Logic* logic)
DemoClient(llarp_ev_loop_ptr l, std::shared_ptr<llarp::Logic> logic)
: abyss::http::JSONRPC(), m_Loop(std::move(l)), m_Logic(logic)
{
}
@ -125,7 +119,7 @@ main(ABSL_ATTRIBUTE_UNUSED int argc, ABSL_ATTRIBUTE_UNUSED char* argv[])
llarp::SetLogLevel(llarp::eLogDebug);
llarp_threadpool* threadpool = llarp_init_same_process_threadpool();
llarp_ev_loop_ptr loop = llarp_make_ev_loop();
llarp::Logic* logic = new llarp::Logic(threadpool);
auto logic = std::make_shared<llarp::Logic>(threadpool);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(1222);

View file

@ -326,8 +326,6 @@ namespace abyss
BaseReqHandler::BaseReqHandler(llarp_time_t reqtimeout)
: m_ReqTimeout(reqtimeout)
{
m_loop = nullptr;
m_Logic = nullptr;
m_acceptor.accepted = &BaseReqHandler::OnAccept;
m_acceptor.user = this;
m_acceptor.tick = &OnTick;
@ -335,10 +333,10 @@ namespace abyss
}
bool
BaseReqHandler::ServeAsync(llarp_ev_loop_ptr loop, llarp::Logic* logic,
BaseReqHandler::ServeAsync(llarp_ev_loop_ptr loop, std::shared_ptr<llarp::Logic> logic,
const sockaddr* bindaddr)
{
m_loop = std::move(loop);
m_loop = loop;
m_Logic = logic;
return llarp_tcp_serve(m_loop.get(), &m_acceptor, bindaddr);
}

View file

@ -265,12 +265,12 @@ __ ___ ____ _ _ ___ _ _ ____
// ensure netio thread
if(singleThreaded)
{
logic = std::make_unique< Logic >(worker.get());
logic = std::make_shared< Logic >(worker.get());
}
else
logic = std::make_unique< Logic >();
logic = std::make_shared< Logic >();
router = std::make_unique< Router >(worker.get(), mainloop, logic.get());
router = std::make_unique< Router >(worker.get(), mainloop, logic);
if(!router->Configure(config.get()))
{
llarp::LogError("Failed to configure router");
@ -301,7 +301,7 @@ __ ___ ____ _ _ ___ _ _ ____
// run net io thread
llarp::LogInfo("running mainloop");
llarp_ev_loop_run_single_process(mainloop, worker.get(), logic.get());
llarp_ev_loop_run_single_process(mainloop, worker.get(), logic);
// waits for router graceful stop
return 0;
}
@ -422,7 +422,7 @@ __ ___ ____ _ _ ___ _ _ ____
router.release();
llarp::LogDebug("free logic");
logic.release();
logic.reset();
RemovePIDFile();
}

View file

@ -1,15 +1,20 @@
#include <dns/server.hpp>
#include <crypto/crypto.hpp>
#include <util/logic.hpp>
#include <array>
namespace llarp
{
namespace dns
{
Proxy::Proxy(llarp_ev_loop_ptr loop, IQueryHandler* h)
: m_Loop(std::move(loop)), m_QueryHandler(h)
Proxy::Proxy(llarp_ev_loop_ptr serverLoop, Logic_ptr serverLogic,
llarp_ev_loop_ptr clientLoop, Logic_ptr clientLogic, IQueryHandler* h)
: m_ServerLoop(serverLoop),
m_ServerLogic(serverLogic),
m_ClientLoop(clientLoop),
m_ClientLogic(clientLogic),
m_QueryHandler(h)
{
m_Client.user = this;
m_Server.user = this;
@ -25,14 +30,20 @@ namespace llarp
}
bool
Proxy::Start(const llarp::Addr& addr,
Proxy::Start(const llarp::Addr addr,
const std::vector< llarp::Addr >& resolvers)
{
m_Resolvers.clear();
m_Resolvers = resolvers;
llarp::Addr any("0.0.0.0", 0);
return llarp_ev_add_udp(m_Loop.get(), &m_Server, addr) == 0
&& llarp_ev_add_udp(m_Loop.get(), &m_Client, any) == 0;
const llarp::Addr any("0.0.0.0", 0);
auto self = shared_from_this();
m_ClientLogic->queue_func([=]() {
llarp_ev_add_udp(self->m_ClientLoop.get(), &self->m_Client, any);
});
m_ServerLogic->queue_func([=]() {
llarp_ev_add_udp(self->m_ServerLoop.get(), &self->m_Server, addr);
});
return true;
}
void
@ -66,18 +77,39 @@ namespace llarp
}
void
Proxy::SendMessageTo(llarp::Addr to, Message msg)
Proxy::SendServerMessageTo(llarp::Addr to, Message msg)
{
std::array< byte_t, 1500 > tmp = {{0}};
llarp_buffer_t buf(tmp);
if(msg.Encode(&buf))
{
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
llarp_ev_udp_sendto(&m_Server, to, buf);
}
else
llarp::LogWarn("failed to encode dns message when sending");
auto self = shared_from_this();
m_ServerLogic->queue_func([to, msg, self]() {
std::array< byte_t, 1500 > tmp = {{0}};
llarp_buffer_t buf(tmp);
if(msg.Encode(&buf))
{
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
llarp_ev_udp_sendto(&self->m_Server, to, buf);
}
else
llarp::LogWarn("failed to encode dns message when sending");
});
}
void
Proxy::SendClientMessageTo(llarp::Addr to, Message msg)
{
auto self = shared_from_this();
m_ClientLogic->queue_func([to, msg, self]() {
std::array< byte_t, 1500 > tmp = {{0}};
llarp_buffer_t buf(tmp);
if(msg.Encode(&buf))
{
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
llarp_ev_udp_sendto(&self->m_Client, to, buf);
}
else
llarp::LogWarn("failed to encode dns message when sending");
});
}
void
@ -91,17 +123,20 @@ namespace llarp
}
TX tx = {hdr.id, from};
auto itr = m_Forwarded.find(tx);
if(itr != m_Forwarded.end())
{
llarp_buffer_t buf;
buf.sz = pkt->sz;
buf.base = pkt->base;
buf.cur = buf.base;
// forward reply
llarp_ev_udp_sendto(&m_Server, itr->second, buf);
// remove pending
m_Forwarded.erase(itr);
}
if(itr == m_Forwarded.end())
return;
const Addr requester = itr->second;
std::vector<byte_t> tmp(pkt->sz);
std::copy_n(pkt->cur, pkt->sz, tmp.begin());
auto self = shared_from_this();
m_ServerLogic->queue_func([=]() {
// forward reply to requester via server
llarp_buffer_t tmpbuf(tmp);
llarp_ev_udp_sendto(&self->m_Server, requester, tmpbuf);
});
// remove pending
m_Forwarded.erase(itr);
}
void
@ -121,12 +156,12 @@ namespace llarp
llarp::LogWarn("failed to parse dns message from ", from);
return;
}
auto self = shared_from_this();
if(m_QueryHandler && m_QueryHandler->ShouldHookDNSMessage(msg))
{
if(!m_QueryHandler->HandleHookedDNSMessage(
std::move(msg),
std::bind(&Proxy::SendMessageTo, this, from,
std::bind(&Proxy::SendServerMessageTo, self, from,
std::placeholders::_1)))
{
llarp::LogWarn("failed to handle hooked dns");
@ -137,19 +172,22 @@ namespace llarp
// no upstream resolvers
// let's serv fail it
msg.AddServFail();
SendMessageTo(from, std::move(msg));
SendServerMessageTo(from, std::move(msg));
}
else if(itr == m_Forwarded.end())
{
// new forwarded query
tx.from = PickRandomResolver();
m_Forwarded[tx] = from;
llarp_buffer_t buf;
buf.sz = pkt->sz;
buf.base = pkt->base;
buf.cur = buf.base;
// do query
llarp_ev_udp_sendto(&m_Client, tx.from, buf);
std::vector<byte_t> tmp(pkt->sz);
std::copy_n(pkt->cur, pkt->sz, tmp.begin());
m_ClientLogic->queue_func([=] {
// do query
llarp_buffer_t buf(tmp);
llarp_ev_udp_sendto(&self->m_Client, tx.from, buf);
});
}
else
{

View file

@ -5,6 +5,7 @@
#include <ev/ev.h>
#include <net/net.hpp>
#include <util/string_view.hpp>
#include <util/logic.hpp>
#include <unordered_map>
@ -29,12 +30,14 @@ namespace llarp
std::function< void(Message) > sendReply) = 0;
};
struct Proxy
struct Proxy : public std::enable_shared_from_this<Proxy>
{
Proxy(llarp_ev_loop_ptr loop, IQueryHandler* handler);
using Logic_ptr = std::shared_ptr<Logic>;
Proxy(llarp_ev_loop_ptr serverLoop, Logic_ptr serverLogic, llarp_ev_loop_ptr clientLoop, Logic_ptr clientLogic, IQueryHandler * handler);
bool
Start(const llarp::Addr& addr,
Start(const llarp::Addr addr,
const std::vector< llarp::Addr >& resolvers);
void
@ -63,7 +66,10 @@ namespace llarp
HandlePktServer(llarp::Addr from, llarp_buffer_t* buf);
void
SendMessageTo(llarp::Addr to, Message msg);
SendClientMessageTo(llarp::Addr to, Message msg);
void
SendServerMessageTo(llarp::Addr to, Message msg);
llarp::Addr
PickRandomResolver() const;
@ -71,7 +77,10 @@ namespace llarp
private:
llarp_udp_io m_Server;
llarp_udp_io m_Client;
llarp_ev_loop_ptr m_Loop;
llarp_ev_loop_ptr m_ServerLoop;
llarp_ev_loop_ptr m_ClientLoop;
Logic_ptr m_ServerLogic;
Logic_ptr m_ClientLogic;
IQueryHandler* m_QueryHandler;
std::vector< llarp::Addr > m_Resolvers;

View file

@ -43,7 +43,7 @@ llarp_make_ev_loop()
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
struct llarp_threadpool *tp,
llarp::Logic *logic)
std::shared_ptr<llarp::Logic> logic)
{
while(ev->running())
{

View file

@ -54,7 +54,7 @@ llarp_make_ev_loop();
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
struct llarp_threadpool *tp,
llarp::Logic *logic);
std::shared_ptr<llarp::Logic> logic);
/// get the current time on the event loop
llarp_time_t

View file

@ -286,10 +286,14 @@ namespace llarp
while(queue.size())
{
auto& msg = queue.front();
msg.S = path->NextSeqNo();
if(path && path->SendRoutingMessage(msg, router))
m_LastUse = now;
if(path)
{
msg.S = path->NextSeqNo();
if(path->SendRoutingMessage(msg, router))
m_LastUse = now;
}
queue.pop_front();
// spread across all paths
path = PickRandomEstablishedPath(llarp::path::ePathRoleExit);
}

View file

@ -25,7 +25,7 @@ namespace llarp
ExitEndpoint::ExitEndpoint(const std::string &name, AbstractRouter *r)
: m_Router(r)
, m_Resolver(r->netloop(), this)
, m_Resolver(std::make_shared<dns::Proxy>(r->netloop(), r->logic(), r->netloop(), r->logic(), this))
, m_Name(name)
, m_Tun{{0}, 0, {0}, 0, 0, 0, 0, 0, 0, 0}
, m_LocalResolverAddr("127.0.0.1", 53)
@ -292,7 +292,7 @@ namespace llarp
}
llarp::LogInfo("Trying to start resolver ",
m_LocalResolverAddr.ToString());
return m_Resolver.Start(m_LocalResolverAddr, m_UpstreamResolvers);
return m_Resolver->Start(m_LocalResolverAddr, m_UpstreamResolvers);
}
return true;
}

View file

@ -130,7 +130,7 @@ namespace llarp
KickIdentOffExit(const PubKey& pk);
AbstractRouter* m_Router;
dns::Proxy m_Resolver;
std::shared_ptr<dns::Proxy> m_Resolver;
bool m_ShouldInitTun;
std::string m_Name;
bool m_PermitExit;

View file

@ -15,6 +15,8 @@
#include <util/logic.hpp>
#include <nodedb.hpp>
#include <util/str.hpp>
namespace llarp
{
namespace handlers
@ -39,7 +41,6 @@ namespace llarp
r->netloop())
, m_NetworkToUserPktQueue(nickname + "_recvq", r->netloop(),
r->netloop())
, m_Resolver(r->netloop(), this)
{
#ifdef ANDROID
tunif.get_fd_promise = &get_tun_fd_promise;
@ -93,6 +94,22 @@ namespace llarp
bool
TunEndpoint::SetOption(const std::string &k, const std::string &v)
{
if(k == "isolate-network" && IsTrueValue(v.c_str()))
{
#if defined(__linux__)
LogInfo(Name(), " isolating network...");
if(!SpawnIsolatedNetwork())
{
LogError(Name(), " failed to spawn isolated network");
return false;
}
LogInfo(Name(), " booyeah network isolation succeeded");
return true;
#else
LogError(Name(), " network isolation is not supported on your platform");
return false;
#endif
}
if(k == "strict-connect")
{
RouterID connect;
@ -608,7 +625,7 @@ namespace llarp
llarp::LogError(Name(), " failed to set up network interface");
return false;
}
if(!m_Resolver.Start(m_LocalResolverAddr, m_UpstreamResolvers))
if(!m_Resolver->Start(m_LocalResolverAddr, m_UpstreamResolvers))
{
// downgrade DNS server failure to a warning
llarp::LogWarn(Name(), " failed to start dns server");

View file

@ -230,7 +230,7 @@ namespace llarp
#endif
/// our dns resolver
dns::Proxy m_Resolver;
std::shared_ptr<dns::Proxy> m_Resolver;
/// maps ip address to timestamp last active
std::unordered_map< huint32_t, llarp_time_t, huint32_t::Hash >

View file

@ -44,7 +44,7 @@ namespace llarp
}
bool
LinkLayer::Start(Logic* l)
LinkLayer::Start(std::shared_ptr<Logic> l)
{
if(!ILinkLayer::Start(l))
return false;

View file

@ -31,7 +31,7 @@ namespace llarp
}
bool
Start(Logic *l) override;
Start(std::shared_ptr<Logic> l) override;
std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact &rc,

View file

@ -245,7 +245,7 @@ namespace llarp
}
bool
ILinkLayer::Start(Logic* l)
ILinkLayer::Start(std::shared_ptr<Logic> l)
{
m_Logic = l;
ScheduleTick(100);

View file

@ -126,7 +126,7 @@ namespace llarp
TryEstablishTo(RouterContact rc);
virtual bool
Start(llarp::Logic* l);
Start(std::shared_ptr<llarp::Logic> l);
void
Stop();
@ -239,7 +239,7 @@ namespace llarp
bool
PutSession(const std::shared_ptr< ILinkSession >& s);
llarp::Logic* m_Logic = nullptr;
std::shared_ptr<llarp::Logic> m_Logic = nullptr;
llarp_ev_loop_ptr m_Loop;
Addr m_ourAddr;
llarp_udp_io m_udp;

View file

@ -242,8 +242,8 @@ namespace llarp
// store it into netdb if we don't have it
if(!n->Has(rc.pubkey))
{
llarp::Logic* logic = self->context->Router()->logic();
n->InsertAsync(rc, logic, [=]() {
std::shared_ptr<Logic> l = self->context->Router()->logic();
n->InsertAsync(rc, l, [=]() {
self->context->ForwardLRCM(self->hop->info.upstream,
self->frames);
self->hop = nullptr;

View file

@ -109,7 +109,7 @@ llarp_nodedb::getRCFilePath(const llarp::RouterID &pubkey) const
static void
handle_async_insert_rc(llarp_nodedb *nodedb, const llarp::RouterContact &rc,
llarp::Logic *logic,
std::shared_ptr<llarp::Logic> logic,
const std::function< void(void) > &completedHook)
{
nodedb->Insert(rc);
@ -120,7 +120,7 @@ handle_async_insert_rc(llarp_nodedb *nodedb, const llarp::RouterContact &rc,
}
void
llarp_nodedb::InsertAsync(llarp::RouterContact rc, llarp::Logic *logic,
llarp_nodedb::InsertAsync(llarp::RouterContact rc, std::shared_ptr<llarp::Logic> logic,
std::function< void(void) > completionHandler)
{
disk->addJob(

View file

@ -89,7 +89,7 @@ struct llarp_nodedb
/// insert and write to disk in background
void
InsertAsync(llarp::RouterContact rc, llarp::Logic *l = nullptr,
InsertAsync(llarp::RouterContact rc, std::shared_ptr<llarp::Logic> l = nullptr,
std::function< void(void) > completionHandler = nullptr);
ssize_t
@ -153,7 +153,7 @@ struct llarp_async_verify_rc
/// nodedb storage
llarp_nodedb *nodedb;
// llarp::Logic for queue_job
llarp::Logic *logic; // includes a llarp_threadpool
std::shared_ptr<llarp::Logic> logic; // includes a llarp_threadpool
llarp_threadpool *cryptoworker;
llarp::thread::ThreadPool *diskworker;

View file

@ -63,7 +63,7 @@ namespace llarp
return m_Router->crypto();
}
Logic*
std::shared_ptr<Logic>
PathContext::logic()
{
return m_Router->logic();

View file

@ -708,7 +708,7 @@ namespace llarp
llarp::Crypto*
crypto();
Logic*
std::shared_ptr<Logic>
logic();
AbstractRouter*

View file

@ -26,7 +26,7 @@ namespace llarp
size_t idx = 0;
AbstractRouter* router = nullptr;
llarp_threadpool* worker = nullptr;
Logic* logic = nullptr;
std::shared_ptr<Logic> logic = nullptr;
Crypto* crypto = nullptr;
LR_CommitMessage LRCM;
@ -131,7 +131,7 @@ namespace llarp
/// Generate all keys asynchronously and call handler when done
void
AsyncGenerateKeys(Path_t p, Logic* l, llarp_threadpool* pool, User* u,
AsyncGenerateKeys(Path_t p, std::shared_ptr<Logic> l, llarp_threadpool* pool, User* u,
Handler func)
{
path = p;

View file

@ -62,7 +62,7 @@ namespace llarp
HandleRecvLinkMessageBuffer(ILinkSession *from,
const llarp_buffer_t &msg) = 0;
virtual Logic *
virtual std::shared_ptr<Logic>
logic() const = 0;
virtual llarp_dht_context *

View file

@ -215,7 +215,7 @@ namespace llarp
}
Router::Router(struct llarp_threadpool *_tp, llarp_ev_loop_ptr __netloop,
Logic *l)
std::shared_ptr<Logic> l)
: ready(false)
, _netloop(__netloop)
, tp(_tp)

View file

@ -86,7 +86,7 @@ namespace llarp
/// should we obey the service node whitelist?
bool whitelistRouters = false;
Logic *
std::shared_ptr<Logic>
logic() const override
{
return _logic;
@ -183,7 +183,7 @@ namespace llarp
llarp_ev_loop_ptr _netloop;
llarp_threadpool *tp;
Logic *_logic;
std::shared_ptr<Logic> _logic;
std::unique_ptr< Crypto > _crypto;
path::PathContext paths;
exit::Context _exitContext;
@ -318,7 +318,7 @@ namespace llarp
std::unordered_map< RouterID, llarp_time_t, PubKey::Hash > lokinetRouters;
Router(struct llarp_threadpool *tp, llarp_ev_loop_ptr __netloop,
Logic *logic);
std::shared_ptr<Logic> logic);
~Router();

View file

@ -8,7 +8,7 @@ namespace llarp
{
namespace service
{
AsyncKeyExchange::AsyncKeyExchange(Logic* l, Crypto* c,
AsyncKeyExchange::AsyncKeyExchange(std::shared_ptr<Logic> l, Crypto* c,
const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,

View file

@ -14,7 +14,7 @@ namespace llarp
{
struct AsyncKeyExchange
{
Logic* logic;
std::shared_ptr<Logic> logic;
Crypto* crypto;
SharedSecret sharedKey;
ServiceInfo remote;
@ -28,7 +28,7 @@ namespace llarp
IDataHandler* handler;
ConvoTag tag;
AsyncKeyExchange(Logic* l, Crypto* c, const ServiceInfo& r,
AsyncKeyExchange(std::shared_ptr<Logic> l, Crypto* c, const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,
const Introduction& remote, IDataHandler* h,

View file

@ -108,12 +108,6 @@ namespace llarp
return true;
}
bool
Endpoint::IsolateNetwork()
{
return false;
}
llarp_ev_loop_ptr
Endpoint::EndpointNetLoop()
{
@ -126,13 +120,7 @@ namespace llarp
bool
Endpoint::NetworkIsIsolated() const
{
return m_IsolatedLogic && m_IsolatedWorker;
}
bool
Endpoint::SetupIsolatedNetwork(void* user, bool failed)
{
return static_cast< Endpoint* >(user)->DoNetworkIsolation(!failed);
return m_IsolatedLogic.get() != nullptr && m_IsolatedNetLoop != nullptr;
}
bool
@ -656,22 +644,20 @@ namespace llarp
m_OnReady = nullptr;
}
bool
Endpoint::DoNetworkIsolation(bool failed)
{
if(failed)
return IsolationFailed();
m_IsolatedNetLoop = llarp_make_ev_loop();
return SetupNetworking();
}
void
Endpoint::RunIsolatedMainLoop(void* user)
Endpoint::IsolatedNetworkMainLoop()
{
Endpoint* self = static_cast< Endpoint* >(user);
llarp_ev_loop_run_single_process(self->m_IsolatedNetLoop,
self->m_IsolatedWorker,
self->m_IsolatedLogic);
m_IsolatedNetLoop = llarp_make_ev_loop();
m_IsolatedLogic = std::make_shared<llarp::Logic>();
if(SetupNetworking())
llarp_ev_loop_run_single_process(m_IsolatedNetLoop,
m_IsolatedLogic->thread,
m_IsolatedLogic);
else
{
m_IsolatedNetLoop.reset();
m_IsolatedLogic.reset();
}
}
bool
@ -869,10 +855,7 @@ namespace llarp
{
if(msg->proto == eProtocolTraffic)
{
llarp_buffer_t buf(msg->payload);
return HandleWriteIPPacket(buf,
std::bind(&Endpoint::ObtainIPForAddr, this,
msg->sender.Addr(), false));
m_InboundTrafficQueue.emplace(msg);
}
else if(msg->proto == eProtocolControl)
{
@ -1060,16 +1043,31 @@ namespace llarp
void Endpoint::Pump(llarp_time_t)
{
EndpointLogic()->queue_func([&]() {
// send downstream packets to user for snode
for(const auto& item : m_SNodeSessions)
item.second->FlushDownstream();
// send downstrream traffic to user for hidden service
util::Lock lock(&m_InboundTrafficQueueMutex);
while(m_InboundTrafficQueue.size())
{
const auto & msg = m_InboundTrafficQueue.top();
llarp_buffer_t buf(msg->payload);
HandleWriteIPPacket(buf, [&]() -> huint32_t {
return ObtainIPForAddr(msg->sender.Addr(), false);
});
m_InboundTrafficQueue.pop();
}
});
auto router = Router();
// TODO: locking on this container
for(const auto& item : m_RemoteSessions)
item.second->FlushUpstream();
// TODO: locking on this container
for(const auto& item : m_SNodeSessions)
item.second->FlushUpstream();
util::Lock lock(&m_SendQueueMutex);
// send outbound traffic
for(const auto& item : m_SendQueue)
item.second->SendRoutingMessage(*item.first, router);
m_SendQueue.clear();
@ -1214,13 +1212,13 @@ namespace llarp
&& (NumInStatus(path::ePathBuilding) < m_NumPaths));
}
Logic*
std::shared_ptr<Logic>
Endpoint::RouterLogic()
{
return m_Router->logic();
}
Logic*
std::shared_ptr<Logic>
Endpoint::EndpointLogic()
{
return m_IsolatedLogic ? m_IsolatedLogic : m_Router->logic();

View file

@ -16,6 +16,8 @@
#include <service/session.hpp>
#include <service/tag_lookup_job.hpp>
#include <hook/ihook.hpp>
#include <util/compare_ptr.hpp>
#include <util/logic.hpp>
// minimum time between introset shifts
#ifndef MIN_SHIFT_INTERVAL
@ -90,12 +92,12 @@ namespace llarp
/// router's logic
/// use when sending any data on a path
Logic*
std::shared_ptr<Logic>
RouterLogic();
/// endpoint's logic
/// use when writing any data to local network interfaces
Logic*
std::shared_ptr<Logic>
EndpointLogic();
/// borrow endpoint's net loop for sending data to user on local network
@ -319,14 +321,21 @@ namespace llarp
void
PrefetchServicesByTag(const Tag& tag);
bool
IsolateNetwork();
/// spawn a new process that contains a network isolated process
/// return true if we set up isolation and the event loop is up
/// otherwise return false
virtual bool
SpawnIsolatedNetwork()
{
return false;
}
bool
NetworkIsIsolated() const;
static void
RunIsolatedMainLoop(void*);
/// this runs in the isolated network process
void
IsolatedNetworkMainLoop();
private:
void
@ -337,9 +346,6 @@ namespace llarp
OnLookup(const service::Address& addr, const IntroSet* i,
const RouterID& endpoint); /* */
static bool
SetupIsolatedNetwork(void* user, bool success);
bool
DoNetworkIsolation(bool failed);
@ -404,6 +410,13 @@ namespace llarp
using PendingTraffic =
std::unordered_map< Address, PendingBufferQueue, Address::Hash >;
using ProtocolMessagePtr = std::shared_ptr<ProtocolMessage>;
using RecvPacketQueue_t = std::priority_queue<ProtocolMessagePtr, std::vector<ProtocolMessagePtr>, ComparePtr<ProtocolMessagePtr> >;
util::Mutex m_InboundTrafficQueueMutex;
/// ordered queue for inbound hidden service traffic
RecvPacketQueue_t m_InboundTrafficQueue GUARDED_BY(m_InboundTrafficQueueMutex);
using PendingRouters =
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >;
@ -421,8 +434,7 @@ namespace llarp
using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
AbstractRouter* m_Router;
llarp_threadpool* m_IsolatedWorker = nullptr;
Logic* m_IsolatedLogic = nullptr;
std::shared_ptr<Logic> m_IsolatedLogic = nullptr;
llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr;
std::string m_Keyfile;
std::string m_Name;

View file

@ -52,6 +52,8 @@ namespace llarp
}
if(!BEncodeMaybeReadDictEntry("i", introReply, read, k, buf))
return false;
if(!BEncodeMaybeReadDictInt("n", seqno, read, k, buf))
return false;
if(!BEncodeMaybeReadDictEntry("s", sender, read, k, buf))
return false;
if(!BEncodeMaybeReadDictEntry("t", tag, read, k, buf))
@ -74,6 +76,8 @@ namespace llarp
return false;
if(!BEncodeWriteDictEntry("i", introReply, buf))
return false;
if(!BEncodeWriteDictInt("n", seqno, buf))
return false;
if(!BEncodeWriteDictEntry("s", sender, buf))
return false;
if(!tag.IsZero())
@ -242,14 +246,14 @@ namespace llarp
struct AsyncFrameDecrypt
{
Crypto* crypto;
Logic* logic;
std::shared_ptr<Logic> logic;
std::shared_ptr< ProtocolMessage > msg;
const Identity& m_LocalIdentity;
IDataHandler* handler;
const ProtocolFrame frame;
const Introduction fromIntro;
AsyncFrameDecrypt(Logic* l, Crypto* c, const Identity& localIdent,
AsyncFrameDecrypt(std::shared_ptr<Logic> l, Crypto* c, const Identity& localIdent,
IDataHandler* h,
const std::shared_ptr< ProtocolMessage >& m,
const ProtocolFrame& f, const Introduction& recvIntro)
@ -362,7 +366,7 @@ namespace llarp
}
bool
ProtocolFrame::AsyncDecryptAndVerify(Logic* logic, Crypto* c,
ProtocolFrame::AsyncDecryptAndVerify(std::shared_ptr<Logic> logic, Crypto* c,
path::Path_ptr recvPath,
llarp_threadpool* worker,
const Identity& localIdent,

View file

@ -52,6 +52,7 @@ namespace llarp
/// local path we got this message from
PathID_t srcPath;
ConvoTag tag;
uint64_t seqno = 0;
bool
DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* val) override;
@ -64,6 +65,13 @@ namespace llarp
static void
ProcessAsync(std::shared_ptr< ProtocolMessage > self);
bool
operator<(const ProtocolMessage & other) const
{
return seqno < other.seqno;
}
};
/// outer message
@ -119,7 +127,7 @@ namespace llarp
Sign(Crypto* c, const Identity& localIdent);
bool
AsyncDecryptAndVerify(Logic* logic, Crypto* c, path::Path_ptr fromPath,
AsyncDecryptAndVerify(std::shared_ptr<Logic> logic, Crypto* c, path::Path_ptr fromPath,
llarp_threadpool* worker,
const Identity& localIdent,
IDataHandler* handler) const;

View file

@ -44,7 +44,6 @@ namespace llarp
if(item.second->SendRoutingMessage(*item.first, r))
{
lastGoodSend = r->Now();
++sequenceNo;
}
else
LogError("Failed to send frame on path");
@ -79,28 +78,27 @@ namespace llarp
return;
}
if(m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
{
ProtocolMessage m;
m_DataHandler->PutIntroFor(f.T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f.T, path->intro);
m.proto = t;
m.introReply = path->intro;
f.F = m.introReply.pathID;
m.sender = m_Endpoint->GetIdentity().pub;
m.tag = f.T;
m.PutBuffer(payload);
if(!f.EncryptAndSign(crypto, m, shared, m_Endpoint->GetIdentity()))
{
LogError("failed to sign");
return;
}
}
else
if(!m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
{
LogError("No cached session key");
return;
}
ProtocolMessage m;
m_DataHandler->PutIntroFor(f.T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f.T, path->intro);
m.proto = t;
m.seqno = sequenceNo++;
m.introReply = path->intro;
f.F = m.introReply.pathID;
m.sender = m_Endpoint->GetIdentity().pub;
m.tag = f.T;
m.PutBuffer(payload);
if(!f.EncryptAndSign(crypto, m, shared, m_Endpoint->GetIdentity()))
{
LogError("failed to sign");
return;
}
Send(f, path);
}
@ -116,7 +114,7 @@ namespace llarp
LogWarn("no good path yet, your message may drop");
}
}
if(sequenceNo)
if(lastGoodSend)
{
EncryptAndSendTo(data, protocol);
}

View file

@ -13,7 +13,7 @@ using TestPipeReadFunc = std::function< bool(const llarp_buffer_t) >;
struct EventLoopTest : public ::testing::Test
{
llarp_ev_loop_ptr loop;
llarp::Logic _logic;
std::shared_ptr<llarp::Logic> _logic;
llarp::sodium::CryptoLibSodium crypto;
static void
@ -28,7 +28,8 @@ struct EventLoopTest : public ::testing::Test
SetUp()
{
loop = llarp_make_ev_loop();
_logic.call_later({10000, this, &OnTimeout});
_logic = std::make_shared<llarp::Logic>();
_logic->call_later({10000, this, &OnTimeout});
}
void
@ -42,7 +43,6 @@ struct EventLoopTest : public ::testing::Test
Stop()
{
llarp_ev_loop_stop(loop.get());
_logic.stop();
}
void
@ -50,12 +50,13 @@ struct EventLoopTest : public ::testing::Test
{
Stop();
loop.reset();
_logic.reset();
}
void
RunLoop()
{
llarp_ev_loop_run_single_process(loop, _logic.thread, &_logic);
llarp_ev_loop_run_single_process(loop, _logic->thread, _logic);
}
};

View file

@ -68,7 +68,7 @@ struct LinkLayerTest : public ::testing::Test
}
bool
Start(llarp::Logic* logic, llarp_ev_loop_ptr loop, uint16_t port)
Start(std::shared_ptr<llarp::Logic> logic, llarp_ev_loop_ptr loop, uint16_t port)
{
if(!link)
return false;
@ -107,7 +107,7 @@ struct LinkLayerTest : public ::testing::Test
bool success = false;
llarp_ev_loop_ptr netLoop;
std::unique_ptr< llarp::Logic > m_logic;
std::shared_ptr< llarp::Logic > m_logic;
llarp_time_t oldRCLifetime;
@ -148,7 +148,7 @@ struct LinkLayerTest : public ::testing::Test
RunMainloop()
{
m_logic->call_later({5000, this, &OnTimeout});
llarp_ev_loop_run_single_process(netLoop, m_logic->thread, m_logic.get());
llarp_ev_loop_run_single_process(netLoop, m_logic->thread, m_logic);
}
void
@ -247,8 +247,8 @@ TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
},
[&](llarp::RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
ASSERT_TRUE(Alice.Start(m_logic.get(), netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic.get(), netLoop, BobPort));
ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
@ -336,8 +336,8 @@ TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
},
[&](llarp::RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
ASSERT_TRUE(Alice.Start(m_logic.get(), netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic.get(), netLoop, BobPort));
ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));

View file

@ -13,7 +13,7 @@ struct AbyssTestBase : public ::testing::Test
llarp::sodium::CryptoLibSodium crypto;
llarp_threadpool* threadpool = nullptr;
llarp_ev_loop_ptr loop = nullptr;
std::unique_ptr< llarp::Logic > logic;
std::shared_ptr< llarp::Logic > logic;
abyss::httpd::BaseReqHandler* server = nullptr;
abyss::http::JSONRPC* client = nullptr;
const std::string method = "test.method";
@ -49,7 +49,7 @@ struct AbyssTestBase : public ::testing::Test
{
threadpool = llarp_init_same_process_threadpool();
loop = llarp_make_ev_loop();
logic = std::make_unique< llarp::Logic >(threadpool);
logic = std::make_shared< llarp::Logic >(threadpool);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
@ -58,7 +58,7 @@ struct AbyssTestBase : public ::testing::Test
llarp::Addr a(addr);
while(true)
{
if(server->ServeAsync(loop, logic.get(), a))
if(server->ServeAsync(loop, logic, a))
{
client->RunAsync(loop, a.ToString());
logic->call_later({1000, this, &CancelIt});
@ -177,7 +177,7 @@ struct AbyssTest : public AbyssTestBase,
void
RunLoop()
{
llarp_ev_loop_run_single_process(loop, threadpool, logic.get());
llarp_ev_loop_run_single_process(loop, threadpool, logic);
}
};