1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

Merge remote-tracking branch 'origin/master'

This commit is contained in:
Jeff 2018-10-21 10:58:01 -04:00
commit 390333c787
30 changed files with 389 additions and 573 deletions

View file

@ -63,6 +63,10 @@ else()
set(WITH_STATIC ON)
endif()
if(TESTNET)
add_definitions(-DTESTNET=1)
endif()
add_cflags("-Wall")
add_cxxflags("-Wall")
@ -318,7 +322,6 @@ set(LIB_SRC
${UTP_SRC}
${NTRU_SRC}
llarp/address_info.cpp
llarp/arpc.cpp
llarp/bencode.cpp
llarp/buffer.cpp
llarp/config.cpp
@ -348,6 +351,7 @@ set(LIB_SRC
llarp/relay_up_down.cpp
llarp/router_contact.cpp
llarp/router.cpp
llarp/rpc.cpp
llarp/service.cpp
llarp/transit_hop.cpp
llarp/testnet.c
@ -439,27 +443,42 @@ include_directories(${sodium_INCLUDE_DIR})
set(RC_EXE rcutil)
set(DNS_EXE dns)
set(ABYSS ${CMAKE_SOURCE_DIR}/libabyss)
set(ABYSS_LIB abyss)
include_directories(${ABYSS}/include)
set(ABYSS_SRC
${ABYSS}/src/http.cpp
${ABYSS}/src/client.cpp
${ABYSS}/src/server.cpp
${ABYSS}/src/lib.cpp)
add_library(${ABYSS_LIB} ${ABYSS_SRC})
if(SHADOW)
add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC})
add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC})
target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS})
install(TARGETS shadow-plugin-${SHARED_LIB} DESTINATION plugins)
else()
add_executable(${RC_EXE} ${RC_SRC})
add_executable(${EXE} ${EXE_SRC})
add_executable(${CLIENT_EXE} ${CLIENT_SRC})
add_executable(${DNS_EXE} ${DNS_SRC})
add_subdirectory(${GTEST_DIR})
include_directories(${GTEST_DIR}/include ${GTEST_DIR})
add_executable(${TEST_EXE} ${TEST_SRC})
if(WITH_STATIC)
add_library(${STATIC_LIB} STATIC ${LIB_SRC})
if(NOT HAVE_CXX17_FILESYSTEM)
add_library(${BACKPORT_LIB} STATIC ${CPP_BACKPORT_SRC})
endif(NOT HAVE_CXX17_FILESYSTEM)
add_library(${PLATFORM_LIB} STATIC ${LIB_PLATFORM_SRC})
target_link_libraries(${PLATFORM_LIB} ${THREAD_LIB})
target_link_libraries(${PLATFORM_LIB} ${THREAD_LIB} ${ABYSS_LIB})
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
target_link_libraries(${PLATFORM_LIB} -lcap)
endif()

View file

@ -91,7 +91,7 @@ testnet-clean: clean
rm -rf $(TESTNET_ROOT)
testnet-configure: testnet-clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX)
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTESTNET=1
testnet-build: testnet-configure
ninja
@ -105,7 +105,7 @@ shared: shared-configure
testnet:
cp $(EXE) $(TESTNET_EXE)
mkdir -p $(TESTNET_ROOT)
python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --connect=3
python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --connect=4
LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
test: debug

View file

@ -1,64 +0,0 @@
#ifndef LLARP_ARPC_HPP
#define LLARP_ARPC_HPP
#include <llarp/bencode.hpp>
#include <llarp/crypto.hpp>
#include <llarp/logger.hpp>
#include <llarp/time.h>
#include <llarp/endian.h>
#include <llarp/ev.h>
#include <functional>
#include <string>
#include <map>
#include <unordered_map>
#ifndef _WIN32
#include <sys/un.h>
#endif
#include <llarp/net.hpp>
// forward declare
struct llarp_router;
namespace llarp
{
namespace arpc
{
// forward declare
struct BaseMessage;
struct Server
{
llarp_tcp_acceptor m_acceptor;
llarp_router* router;
Server(llarp_router* r);
static void
OnAccept(llarp_tcp_acceptor* a, llarp_tcp_conn* conn);
bool
Start(const std::string& bindaddr);
const llarp_crypto*
Crypto() const;
const byte_t*
SigningPublicKey() const
{
return llarp::seckey_topublic(SigningPrivateKey());
}
const byte_t*
SigningPrivateKey() const;
bool
Sign(BaseMessage* msg) const;
};
} // namespace arpc
} // namespace llarp
#endif

View file

@ -21,8 +21,9 @@ namespace llarp
relayed = relay;
}
FindIntroMessage(const llarp::service::Tag& tag, uint64_t txid)
: IMessage({}), N(tag), T(txid)
FindIntroMessage(const llarp::service::Tag& tag, uint64_t txid,
uint64_t r = 3)
: IMessage({}), R(r), N(tag), T(txid)
{
S.Zero();
}

View file

@ -94,6 +94,8 @@ struct llarp_tcp_conn
void (*read)(struct llarp_tcp_conn *, const void *, size_t);
/// handle close event (free-ing is handled by event loop)
void (*closed)(struct llarp_tcp_conn *);
/// handle event loop tick
void (*tick)(struct llarp_tcp_conn *);
};
/// queue async write a buffer in full
@ -120,7 +122,8 @@ struct llarp_tcp_acceptor
/// return false if failed to bind
/// return true on successs
bool
llarp_tcp_serve(struct llarp_tcp_acceptor *t, const sockaddr *bindaddr);
llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *t,
const sockaddr *bindaddr);
/// close and stop accepting connections
void

View file

@ -404,7 +404,7 @@ namespace llarp
HandleRelayCommit(const LR_CommitMessage* msg);
void
PutTransitHop(TransitHop* hop);
PutTransitHop(std::shared_ptr< TransitHop > hop);
IHopHandler*
GetByUpstream(const RouterID& id, const PathID_t& path);
@ -443,7 +443,8 @@ namespace llarp
void
RemovePathSet(PathSet* set);
typedef std::multimap< PathID_t, TransitHop* > TransitHopsMap_t;
typedef std::multimap< PathID_t, std::shared_ptr< TransitHop > >
TransitHopsMap_t;
typedef std::pair< util::Mutex, TransitHopsMap_t > SyncTransitMap_t;

31
include/llarp/rpc.hpp Normal file
View file

@ -0,0 +1,31 @@
#ifndef LLARP_RPC_HPP
#define LLARP_RPC_HPP
#include <llarp/time.h>
#include <llarp/ev.h>
#include <string>
// forward declare
struct llarp_router;
namespace llarp
{
namespace rpc
{
struct ServerImpl;
struct Server
{
Server(llarp_router* r);
~Server();
bool
Start(const std::string& bindaddr);
private:
ServerImpl* m_Impl;
};
} // namespace rpc
} // namespace llarp
#endif

View file

@ -21,11 +21,7 @@ namespace llarp
Tag(const std::string& str) : Tag()
{
#ifndef MIN
#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
memcpy(data(), str.c_str(), MIN(16UL, str.size()));
#undef MIN
#endif
memcpy(data(), str.c_str(), std::min(16UL, str.size()));
}
Tag&
@ -38,11 +34,7 @@ namespace llarp
Tag&
operator=(const std::string& str)
{
#ifndef MIN
#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
memcpy(data(), str.data(), MIN(16UL, str.size()));
#undef MIN
#endif
memcpy(data(), str.data(), std::min(16UL, str.size()));
return *this;
}
@ -67,4 +59,4 @@ namespace llarp
} // namespace service
} // namespace llarp
#endif
#endif

15
libabyss/CMakeLists.txt Normal file
View file

@ -0,0 +1,15 @@
set(ABYSS src)
set(ABYSS_LIB abyss)
include_directories(include)
set(ABYSS_SRC
${ABYSS}/http.cpp
${ABYSS}/client.cpp
${ABYSS}/server.cpp
${ABYSS}/lib.cpp
)
add_library(${ABYSS_LIB} ${ABYSS_SRC})

View file

@ -0,0 +1,13 @@
#ifdnef __LIB_ABYSS_H__
#define __LIB_ABYSS_H__
#include <llarp/ev.h>
#ifdef __cplusplus
extern "C"
{
#endif
#ifdef __cplusplus
}
#endif
#endif

View file

@ -0,0 +1,39 @@
#ifndef __LIB_ABYSS_HPP__
#define __LIB_ABYSS_HPP__
#include <llarp/ev.h>
#include <llarp/logic.h>
#include <llarp/time.h>
#include <vector>
#include <memory>
namespace abyss
{
namespace http
{
// forward declare
struct ConnHandler;
struct BaseReqHandler
{
BaseReqHandler(llarp_time_t req_timeout);
~BaseReqHandler();
bool
ServeAsync(llarp_ev_loop* loop, llarp_logic* logic,
const sockaddr* bindaddr);
private:
static void
OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*);
llarp_ev_loop* m_loop;
llarp_logic* m_Logic;
llarp_tcp_acceptor m_acceptor;
std::vector< std::unique_ptr< ConnHandler > > m_Conns;
llarp_time_t m_ReqTimeout;
};
} // namespace http
} // namespace abyss
#endif

0
libabyss/src/client.cpp Normal file
View file

0
libabyss/src/http.cpp Normal file
View file

0
libabyss/src/lib.c Normal file
View file

0
libabyss/src/lib.cpp Normal file
View file

56
libabyss/src/server.cpp Normal file
View file

@ -0,0 +1,56 @@
#include <libabyss.hpp>
#include <llarp/time.h>
namespace abyss
{
namespace http
{
struct ConnHandler
{
llarp_tcp_conn* _conn;
llarp_time_t m_LastActive;
llarp_time_t m_ReadTimeout;
ConnHandler(llarp_tcp_conn* c, llarp_time_t readtimeout) : _conn(c)
{
m_LastActive = llarp_time_now_ms();
m_ReadTimeout = readtimeout;
}
bool
ShouldClose(llarp_time_t now) const
{
return now - m_LastActive > m_ReadTimeout;
}
void
Begin()
{
}
};
BaseReqHandler::BaseReqHandler(llarp_time_t reqtimeout)
: m_ReqTimeout(reqtimeout)
{
m_loop = nullptr;
m_Logic = nullptr;
m_acceptor.accepted = &BaseReqHandler::OnAccept;
m_acceptor.user = this;
}
BaseReqHandler::~BaseReqHandler()
{
llarp_tcp_acceptor_close(&m_acceptor);
}
void
BaseReqHandler::OnAccept(llarp_tcp_acceptor* acceptor, llarp_tcp_conn* conn)
{
BaseReqHandler* self = static_cast< BaseReqHandler* >(acceptor->user);
ConnHandler* handler = new ConnHandler(conn, self->m_ReqTimeout);
conn->user = handler;
self->m_Conns.emplace_back(handler);
}
} // namespace http
} // namespace abyss

View file

@ -1,405 +0,0 @@
#include <llarp/arpc.hpp>
namespace llarp
{
namespace arpc
{
/// interface for request messages
struct IRequest
{
/// returns false if errmsg is set
/// returns true if retval is set
virtual bool
HandleRequest(Server* ctx, std::unique_ptr< BaseMessage >& retval,
std::string& errmsg) const = 0;
};
struct BaseMessage : public llarp::IBEncodeMessage, public IRequest
{
static constexpr size_t MaxIDSize = 128;
/// maximum size of a message
static constexpr size_t MaxSize = 1024 * 8;
BaseMessage()
{
timestamp = llarp_time_now_ms();
zkey.Zero();
zsig.Zero();
}
std::string m_id;
llarp_time_t timestamp;
llarp::PubKey zkey;
llarp::Signature zsig;
/// override me
virtual std::string
Method() const = 0;
/// encode the entire message
bool
BEncode(llarp_buffer_t* buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictString("aRPC-method", Method(), buf))
return false;
if(!BEncodeWriteDictString("id", m_id, buf))
return false;
if(!BEncodeBody(buf))
return false;
if(!zkey.IsZero())
{
if(!BEncodeWriteDictEntry("z-key", zkey, buf))
return false;
if(!BEncodeWriteDictEntry("z-sig", zsig, buf))
return false;
}
return bencode_end(buf);
}
bool
DecodeKey(llarp_buffer_t k, llarp_buffer_t* buf)
{
if(llarp_buffer_eq(k, "id"))
{
return DecodeID(buf);
}
if(llarp_buffer_eq(k, "params"))
{
return DecodeParams(buf);
}
return false;
}
protected:
typedef bool (*ParamDecoder)(dict_reader*, llarp_buffer_t*);
virtual ParamDecoder
GetParamDecoder() const = 0;
bool
DecodeParams(llarp_buffer_t* buf)
{
dict_reader r;
r.user = this;
r.on_key = GetParamDecoder();
return bencode_read_dict(buf, &r);
}
bool
DecodeID(llarp_buffer_t* buf)
{
llarp_buffer_t strbuf;
if(!bencode_read_string(buf, &strbuf))
return false;
if(strbuf.sz > MaxIDSize) // too big
return false;
m_id = std::string((char*)strbuf.base, strbuf.sz);
return true;
}
/// encode body of message
virtual bool
BEncodeBody(llarp_buffer_t* buf) const = 0;
};
struct ConnHandler
{
ConnHandler(Server* s, llarp_tcp_conn* c) : parent(s), m_conn(c)
{
left = 0;
readingHeader = true;
}
bool readingHeader;
Server* parent;
llarp_tcp_conn* m_conn;
AlignedBuffer< BaseMessage::MaxSize > buf;
uint16_t left;
void
ParseMessage();
void
Close()
{
llarp_tcp_conn_close(m_conn);
}
static void
OnClosed(llarp_tcp_conn* conn)
{
ConnHandler* self = static_cast< ConnHandler* >(conn->user);
delete self;
}
static void
OnRead(llarp_tcp_conn* conn, const void* buf, size_t sz)
{
ConnHandler* self = static_cast< ConnHandler* >(conn->user);
const byte_t* ptr = (const byte_t*)buf;
do
{
if(self->readingHeader)
{
self->left = bufbe16toh(ptr);
sz -= 2;
ptr += 2;
self->readingHeader = false;
}
size_t dlt = std::min((size_t)self->left, sz);
memcpy(self->buf.data() + (self->buf.size() - self->left), ptr, dlt);
self->left -= dlt;
sz -= dlt;
if(self->left == 0)
{
self->ParseMessage();
self->readingHeader = true;
}
} while(sz > 0);
}
};
/// base type for ping req/resp
struct Ping : public BaseMessage
{
Ping() : BaseMessage()
{
}
uint64_t ping;
std::string
Method() const
{
return "llarp.rpc.ping";
}
bool
BEncodeBody(llarp_buffer_t* buf) const
{
if(!bencode_write_bytestring(buf, "params", 6))
return false;
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictInt("ping", ping, buf))
return false;
return bencode_end(buf);
}
static bool
OnParamKey(dict_reader* r, llarp_buffer_t* k)
{
Ping* self = static_cast< Ping* >(r->user);
if(k && llarp_buffer_eq(*k, "ping"))
{
return bencode_read_integer(r->buffer, &self->ping);
}
else
return k == nullptr;
}
virtual ParamDecoder
GetParamDecoder() const
{
return &OnParamKey;
}
};
struct PingResponse : public Ping
{
PingResponse(uint64_t p) : Ping()
{
ping = p;
}
bool
HandleRequest(Server*, std::unique_ptr< BaseMessage >&,
std::string&) const
{
/// TODO: handle client response
llarp::LogInfo(Method(), "pong ", ping);
return false;
}
};
struct PingRequest : public Ping
{
bool
HandleRequest(Server* serv, std::unique_ptr< BaseMessage >& retval,
std::string& errmsg) const
{
PingResponse* resp = new PingResponse(ping);
if(!serv->Sign(resp))
{
errmsg = "failed to sign response";
return false;
}
retval.reset(resp);
return true;
}
};
struct MessageReader
{
dict_reader m_reader;
BaseMessage* msg = nullptr;
MessageReader()
{
m_reader.user = this;
m_reader.on_key = &OnKey;
}
static bool
OnKey(dict_reader* r, llarp_buffer_t* key)
{
static std::unordered_map< std::string,
const std::function< BaseMessage*(void) > >
msgConstructors = {
{"llarp.rpc.ping",
[]() -> BaseMessage* { return new PingRequest(); }},
};
MessageReader* self = static_cast< MessageReader* >(r->user);
if(self->msg == nullptr)
{
// first key
if(key == nullptr || !llarp_buffer_eq(*key, "aRPC-method"))
{
// bad value
return false;
}
llarp_buffer_t strbuf;
if(!bencode_read_string(r->buffer, &strbuf))
return false;
std::string method = std::string((char*)strbuf.base, strbuf.sz);
auto itr = msgConstructors.find(method);
if(itr == msgConstructors.end())
{
// no such method
return false;
}
else
self->msg = itr->second();
return true;
}
else if(key)
return self->msg->DecodeKey(*key, r->buffer);
else
return true;
}
bool
DecodeMessage(llarp_buffer_t* buf,
std::unique_ptr< BaseMessage >& request)
{
msg = nullptr;
if(!bencode_read_dict(buf, &m_reader))
return false;
request.reset(msg);
return true;
}
};
Server::Server(llarp_router* r)
{
router = r;
m_acceptor.user = this;
m_acceptor.accepted = &OnAccept;
}
bool
Server::Start(const std::string& bindaddr)
{
llarp::Addr addr;
sockaddr* saddr = nullptr;
#ifndef _WIN32
sockaddr_un unaddr;
if(bindaddr.find("unix:") == 0)
{
unaddr.sun_family = AF_UNIX;
strncpy(unaddr.sun_path, bindaddr.substr(5).c_str(),
sizeof(unaddr.sun_path));
saddr = (sockaddr*)&unaddr;
}
else
#endif
{
// TODO: ipv6
auto idx = bindaddr.find(':');
std::string host = bindaddr.substr(0, idx);
uint16_t port = std::stoi(bindaddr.substr(idx + 1));
addr = llarp::Addr(host, port);
saddr = (sockaddr*)addr;
}
return llarp_tcp_serve(&m_acceptor, saddr);
}
void
Server::OnAccept(llarp_tcp_acceptor* a, llarp_tcp_conn* conn)
{
Server* self = static_cast< Server* >(a->user);
conn->user = new ConnHandler(self, conn);
conn->read = &ConnHandler::OnRead;
conn->closed = &ConnHandler::OnClosed;
}
bool
Server::Sign(BaseMessage* msg) const
{
msg->zkey = SigningPublicKey();
msg->zsig.Zero();
llarp::Signature sig;
//
byte_t tmp[BaseMessage::MaxSize];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(!msg->BEncode(&buf))
return false;
// rewind buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if(!Crypto()->sign(sig, SigningPrivateKey(), buf))
return false;
msg->zsig = sig;
return true;
}
void
ConnHandler::ParseMessage()
{
std::unique_ptr< BaseMessage > msg;
std::unique_ptr< BaseMessage > response;
std::string errmsg;
MessageReader r;
auto tmp = llarp::Buffer(buf);
if(!r.DecodeMessage(&tmp, msg))
{
llarp::LogError("failed to decode message");
Close();
return;
}
// handle request
if(!msg->HandleRequest(parent, response, errmsg))
{
// TODO: send error reply
llarp::LogError("failed to handle api message: ", errmsg);
Close();
return;
}
if(!parent->Sign(response.get()))
{
llarp::LogError("failed to sign response");
Close();
}
}
} // namespace arpc
} // namespace llarp

View file

@ -132,12 +132,6 @@ namespace llarp
ctx->ScheduleCleanupTimer();
}
void
Context::LookupTagForPath(const service::Tag &tag, uint64_t txid,
const llarp::PathID_t &path, const Key_t &askpeer)
{
}
std::set< service::IntroSet >
Context::FindRandomIntroSetsWithTagExcluding(
const service::Tag &tag, size_t max,
@ -567,7 +561,8 @@ namespace llarp
void
Start(const TXOwner &peer)
{
parent->DHTSendTo(peer.node, new FindIntroMessage(target, peer.txid));
parent->DHTSendTo(peer.node,
new FindIntroMessage(target, peer.txid, R));
}
bool
@ -590,10 +585,10 @@ namespace llarp
found.insert(remoteTag);
}
// collect our local values if we haven't hit a limit
if(found.size() < 8)
if(found.size() < 3)
{
for(const auto &localTag :
parent->FindRandomIntroSetsWithTagExcluding(target, 2, found))
parent->FindRandomIntroSetsWithTagExcluding(target, 1, found))
{
found.insert(localTag);
}
@ -616,6 +611,53 @@ namespace llarp
TXOwner asker(whoasked, whoaskedTX);
TXOwner peer(askpeer, ++ids);
pendingTagLookups.NewTX(peer, tag, new TagLookup(asker, tag, this, R));
llarp::LogInfo("ask ", askpeer, " for ", tag, " on behalf of ", whoasked,
" R=", R);
}
struct LocalTagLookup : public TagLookup
{
PathID_t localPath;
LocalTagLookup(const PathID_t &path, uint64_t txid,
const service::Tag &target, Context *ctx)
: TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 3)
, localPath(path)
{
}
void
SendReply()
{
auto path =
parent->router->paths.GetByUpstream(parent->OurKey(), localPath);
if(!path)
{
llarp::LogWarn(
"did not send reply for relayed dht request, no such local path "
"for pathid=",
localPath);
return;
}
routing::DHTMessage msg;
msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid));
if(!path->SendRoutingMessage(&msg, parent->router))
{
llarp::LogWarn(
"failed to send routing message when informing result of dht "
"request, pathid=",
localPath);
}
}
};
void
Context::LookupTagForPath(const service::Tag &tag, uint64_t txid,
const llarp::PathID_t &path, const Key_t &askpeer)
{
TXOwner peer(askpeer, ++ids);
pendingTagLookups.NewTX(peer, tag,
new LocalTagLookup(path, txid, tag, this));
}
bool

View file

@ -182,13 +182,22 @@ namespace llarp
replies.emplace_back(new GotIntroMessage(reply, T));
return true;
}
else
else if(R < 5)
{
// tag lookup
if(dht.nodes->GetRandomNodeExcluding(peer, exclude))
{
dht.LookupTagRecursive(N, From, T, peer, R - 1);
}
else
{
replies.emplace_back(new GotIntroMessage({}, T));
}
}
else
{
// too big R value
replies.emplace_back(new GotIntroMessage({}, T));
}
}
}

View file

@ -41,14 +41,14 @@ namespace llarp
auto tagLookup = dht.pendingTagLookups.GetPendingLookupFrom(owner);
if(tagLookup)
{
dht.pendingTagLookups.Inform(owner, tagLookup->target, I);
dht.pendingTagLookups.Found(owner, tagLookup->target, I);
return true;
}
auto serviceLookup =
dht.pendingIntrosetLookups.GetPendingLookupFrom(owner);
if(serviceLookup)
{
dht.pendingIntrosetLookups.Inform(owner, serviceLookup->target, I);
dht.pendingIntrosetLookups.Found(owner, serviceLookup->target, I);
return true;
}
llarp::LogError("no pending TX for GIM from ", From, " txid=", T);

View file

@ -111,7 +111,6 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun)
tun->impl = dev;
if(dev)
{
loop->tun_listeners.push_back(tun);
return loop->add_ev(dev, true);
}
return false;
@ -131,6 +130,12 @@ llarp_tcp_serve(struct llarp_tcp_acceptor *tcp, const struct sockaddr *bindaddr)
return false;
}
void
llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp)
{
// TODO: implement me
}
void
llarp_tcp_conn_close(struct llarp_tcp_conn *conn)
{

View file

@ -49,6 +49,9 @@ namespace llarp
virtual int
sendto(const sockaddr* dst, const void* data, size_t sz) = 0;
virtual void
tick(){};
/// used for tun interface and tcp conn
virtual bool
do_write(void* data, size_t sz)
@ -191,23 +194,13 @@ struct llarp_ev_loop
virtual ~llarp_ev_loop(){};
std::vector< llarp_udp_io* > udp_listeners;
std::vector< llarp_tun_io* > tun_listeners;
std::vector< std::unique_ptr< llarp::ev_io > > handlers;
void
tick_listeners()
{
for(auto& l : udp_listeners)
if(l->tick)
l->tick(l);
for(auto& l : tun_listeners)
{
if(l->tick)
l->tick(l);
if(l->before_write)
l->before_write(l);
static_cast< llarp::ev_io* >(l->impl)->flush_write();
}
for(const auto& h : handlers)
h->tick();
}
};

View file

@ -16,6 +16,14 @@
namespace llarp
{
struct tcp_serv : public ev_io
{
};
struct tcp_conn : public ev_io
{
};
struct udp_listener : public ev_io
{
llarp_udp_io* udp;
@ -26,6 +34,13 @@ namespace llarp
{
}
virtual void
tick()
{
if(udp->tick)
udp->tick(udp);
}
virtual int
read(void* buf, size_t sz)
{
@ -84,6 +99,14 @@ namespace llarp
return -1;
}
virtual void
tick()
{
if(t->tick)
t->tick(t);
flush_write();
}
void
flush_write()
{
@ -270,7 +293,14 @@ struct llarp_epoll_loop : public llarp_ev_loop
bool
close_ev(llarp::ev_io* ev)
{
return epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) != -1;
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1)
return false;
// deallocate
std::remove_if(handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
});
return true;
}
llarp::ev_io*
@ -278,7 +308,10 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
llarp::tun* t = new llarp::tun(tun);
if(t->setup())
{
handlers.emplace_back(t);
return t;
}
delete t;
return nullptr;
}
@ -289,9 +322,9 @@ struct llarp_epoll_loop : public llarp_ev_loop
int fd = udp_bind(src);
if(fd == -1)
return nullptr;
llarp::udp_listener* listener = new llarp::udp_listener(fd, l);
l->impl = listener;
udp_listeners.push_back(l);
handlers.emplace_back(new llarp::udp_listener(fd, l));
llarp::ev_io* listener = handlers.back().get();
l->impl = listener;
return listener;
}
@ -321,9 +354,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
{
close_ev(listener);
l->impl = nullptr;
delete listener;
std::remove_if(udp_listeners.begin(), udp_listeners.end(),
[l](llarp_udp_io* i) -> bool { return i == l; });
ret = true;
}
return ret;
}

View file

@ -985,10 +985,14 @@ namespace llarp
bool
IsBogon(const in6_addr& addr)
{
#ifdef TESTNET
return false;
#else
if(!ipv6_is_siit(addr))
return false;
return IsIPv4Bogon(ipaddr_ipv4_bits(addr.s6_addr[12], addr.s6_addr[13],
addr.s6_addr[14], addr.s6_addr[15]));
#endif
}
bool

View file

@ -143,9 +143,10 @@ namespace llarp
bool
PathContext::HasTransitHop(const TransitHopInfo& info)
{
return MapHas(m_TransitPaths, info.txID, [info](TransitHop* hop) -> bool {
return info == hop->info;
});
return MapHas(m_TransitPaths, info.txID,
[info](const std::shared_ptr< TransitHop >& hop) -> bool {
return info == hop->info;
});
}
IHopHandler*
@ -163,20 +164,24 @@ namespace llarp
return own;
return MapGet(m_TransitPaths, id,
[remote](const TransitHop* hop) -> bool {
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.upstream == remote;
},
[](TransitHop* h) -> IHopHandler* { return h; });
[](const std::shared_ptr< TransitHop >& h) -> IHopHandler* {
return h.get();
});
}
IHopHandler*
PathContext::GetByDownstream(const RouterID& remote, const PathID_t& id)
{
return MapGet(m_TransitPaths, id,
[remote](const TransitHop* hop) -> bool {
[remote](const std::shared_ptr< TransitHop >& hop) -> bool {
return hop->info.downstream == remote;
},
[](TransitHop* h) -> IHopHandler* { return h; });
[](const std::shared_ptr< TransitHop >& h) -> IHopHandler* {
return h.get();
});
}
PathSet*
@ -215,14 +220,14 @@ namespace llarp
for(auto i = range.first; i != range.second; ++i)
{
if(i->second->info.upstream == us)
return i->second;
return i->second.get();
}
}
return nullptr;
}
void
PathContext::PutTransitHop(TransitHop* hop)
PathContext::PutTransitHop(std::shared_ptr< TransitHop > hop)
{
MapPut(m_TransitPaths, hop->info.txID, hop);
MapPut(m_TransitPaths, hop->info.rxID, hop);
@ -235,23 +240,16 @@ namespace llarp
auto now = llarp_time_now_ms();
auto& map = m_TransitPaths.second;
auto itr = map.begin();
std::set< TransitHop* > removePaths;
while(itr != map.end())
{
if(itr->second->Expired(now))
{
TransitHop* path = itr->second;
llarp::LogDebug("transit path expired ", path->info);
removePaths.insert(path);
itr = map.erase(itr);
}
++itr;
}
for(auto& p : removePaths)
{
map.erase(p->info.txID);
map.erase(p->info.rxID);
delete p;
else
++itr;
}
for(auto& builder : m_PathBuilders)
{
if(builder)
@ -298,7 +296,7 @@ namespace llarp
for(auto i = range.first; i != range.second; ++i)
{
if(i->second->info.upstream == us)
return i->second;
return i->second.get();
}
}
return nullptr;

View file

@ -169,7 +169,7 @@ namespace llarp
// decrypted record
LR_CommitRecord record;
// the actual hop
Hop* hop;
std::shared_ptr< Hop > hop;
LRCMFrameDecrypt(Context* ctx, Decrypter* dec,
const LR_CommitMessage* commit)
@ -196,6 +196,8 @@ namespace llarp
self->hop->ExpireTime());
self->context->Router()->PersistSessionUntil(self->hop->info.upstream,
self->hop->ExpireTime());
// put hop
self->context->PutTransitHop(self->hop);
// forward to next hop
self->context->ForwardLRCM(self->hop->info.upstream, self->frames);
self->hop = nullptr;
@ -210,6 +212,8 @@ namespace llarp
// persist session to downstream until path expiration
self->context->Router()->PersistSessionUntil(self->hop->info.downstream,
self->hop->ExpireTime());
// put hop
self->context->PutTransitHop(self->hop);
// send path confirmation
llarp::routing::PathConfirmMessage confirm(self->hop->lifetime);
if(!self->hop->SendRoutingMessage(&confirm, self->context->Router()))
@ -279,8 +283,6 @@ namespace llarp
// TODO: check if we really want to accept it
self->hop->started = llarp_time_now_ms();
llarp::LogDebug("Accepted ", self->hop->info);
self->context->PutTransitHop(self->hop);
size_t sz = self->frames[0].size();
// shift

View file

@ -3,7 +3,7 @@
#include <llarp/iwp.hpp>
#include <llarp/link_message.hpp>
#include <llarp/link/utp.hpp>
#include <llarp/arpc.hpp>
#include <llarp/rpc.hpp>
#include "buffer.hpp"
#include "encode.hpp"
@ -655,7 +655,7 @@ llarp_router::Run()
{
rpcBindAddr = DefaultRPCBindAddr;
}
rpcServer = std::make_unique< llarp::arpc::Server >(this);
rpcServer = std::make_unique< llarp::rpc::Server >(this);
if(!rpcServer->Start(rpcBindAddr))
{
llarp::LogError("Binding rpc server to ", rpcBindAddr, " failed");
@ -1252,19 +1252,4 @@ namespace llarp
}
}
}
namespace arpc
{
const byte_t *
Server::SigningPrivateKey() const
{
return router->identity;
}
const llarp_crypto *
Server::Crypto() const
{
return &router->crypto;
}
} // namespace arpc
} // namespace llarp

View file

@ -5,7 +5,7 @@
#include <llarp/router_contact.hpp>
#include <llarp/path.hpp>
#include <llarp/link_layer.hpp>
#include <llarp/arpc.hpp>
#include <llarp/rpc.hpp>
#include <functional>
#include <list>
@ -105,7 +105,7 @@ struct llarp_router
std::string DefaultRPCBindAddr = "127.0.0.1:1190";
bool enableRPCServer = false;
std::unique_ptr< llarp::arpc::Server > rpcServer;
std::unique_ptr< llarp::rpc::Server > rpcServer;
std::string rpcBindAddr = DefaultRPCBindAddr;
std::unique_ptr< llarp::ILinkLayer > outboundLink;

40
llarp/rpc.cpp Normal file
View file

@ -0,0 +1,40 @@
#include <llarp/rpc.hpp>
#include <libabyss.hpp>
namespace llarp
{
namespace rpc
{
struct ServerImpl : public ::abyss::http::BaseReqHandler
{
llarp_router* router;
ServerImpl(llarp_router* r)
: ::abyss::http::BaseReqHandler(2000), router(r)
{
}
bool
Start(const std::string& addr)
{
return false;
}
};
Server::Server(llarp_router* r) : m_Impl(new ServerImpl(r))
{
}
Server::~Server()
{
delete m_Impl;
}
bool
Server::Start(const std::string& addr)
{
return m_Impl->Start(addr);
}
} // namespace rpc
} // namespace llarp

View file

@ -186,7 +186,7 @@ namespace llarp
}
}
}
#ifdef TESTNET
// prefetch tags
for(const auto& tag : m_PrefetchTags)
{
@ -201,12 +201,12 @@ namespace llarp
{
if(HasPendingPathToService(introset.A.Addr()))
continue;
if(!EnsurePathToService(introset.A.Addr(),
[](Address addr, OutboundContext* ctx) {},
10000))
byte_t tmp[1024] = {0};
auto buf = StackBuffer< decltype(tmp) >(tmp);
if(!SendToOrQueue(introset.A.Addr(), buf, eProtocolText))
{
llarp::LogWarn("failed to ensure path to ", introset.A.Addr(),
" for tag ", tag.ToString());
llarp::LogWarn(Name(), " failed to send/queue data to ",
introset.A.Addr(), " for tag ", tag.ToString());
}
}
itr->second.Expire(now);
@ -216,10 +216,16 @@ namespace llarp
if(path)
{
auto job = new TagLookupJob(this, &itr->second);
job->SendRequestViaPath(path, Router());
if(!job->SendRequestViaPath(path, Router()))
llarp::LogError(Name(), " failed to send tag lookup");
}
else
{
llarp::LogError(Name(), " has no paths for tag lookup");
}
}
}
#endif
// tick remote sessions
{