From 139d48b7906e9c87b49c6c24081ea58d378c8fea Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 08:10:44 -0400 Subject: [PATCH 01/12] inbound queue fixes --- llarp/codel.hpp | 4 +--- llarp/iwp_link.cpp | 14 +++++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/llarp/codel.hpp b/llarp/codel.hpp index 98e65453d..4046165b9 100644 --- a/llarp/codel.hpp +++ b/llarp/codel.hpp @@ -15,12 +15,10 @@ namespace llarp { }; - template < typename Mutex_t > struct DummyLock { - DummyLock(const Mutex_t& mtx){ + DummyLock(const DummyMutex& mtx){}; - }; ~DummyLock() { } diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 9b4ea0c5e..0e88c911c 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -516,10 +516,13 @@ namespace iwp while(q.size()) { // TODO: is this right? - nextMsgID = std::max(nextMsgID, q.front()->msgid); - if(!router->HandleRecvLinkMessage(parent, q.front()->Buffer())) - llarp::Warn("failed to process inbound message ", q.front()->msgid); - delete q.front(); + auto &front = q.front(); + nextMsgID = std::max(nextMsgID, front->msgid); + if(!router->HandleRecvLinkMessage(parent, front->Buffer())) + { + llarp::Warn("failed to process inbound message ", front->msgid); + } + delete front; q.pop(); } // TODO: this isn't right @@ -1796,7 +1799,8 @@ namespace iwp } else if(recvqueue.Size() > 2) { - return process_inbound_queue(); + recvqueue.Put(new InboundMessage(id, msg)); + success = process_inbound_queue(); } } else From d34937118de161bc297421d508f08e713ecf9d70 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 08:12:30 -0400 Subject: [PATCH 02/12] make it compile :DDDD --- llarp/iwp_link.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 0e88c911c..b728e1240 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -486,8 +486,7 @@ namespace iwp typedef std::queue< sendbuf_t * > sendqueue_t; typedef llarp::util::CoDelQueue< InboundMessage *, InboundMessage::GetTime, InboundMessage::PutTime, - llarp::util::DummyMutex, - llarp::util::DummyLock< llarp::util::DummyMutex > > + llarp::util::DummyMutex, llarp::util::DummyLock > recvqueue_t; llarp_router *router = nullptr; From 8a682a8b833b8c5d5508255e83f536c6f59fa60a Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 08:21:56 -0400 Subject: [PATCH 03/12] fix previous commit --- llarp/iwp_link.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index b728e1240..368079871 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -516,7 +516,8 @@ namespace iwp { // TODO: is this right? auto &front = q.front(); - nextMsgID = std::max(nextMsgID, front->msgid); + // the items are already sorted anyways so this doesn't really do much + nextMsgID = std::max(nextMsgID, front->msgid); if(!router->HandleRecvLinkMessage(parent, front->Buffer())) { llarp::Warn("failed to process inbound message ", front->msgid); @@ -1796,7 +1797,7 @@ namespace iwp } ++nextMsgID; } - else if(recvqueue.Size() > 2) + else { recvqueue.Put(new InboundMessage(id, msg)); success = process_inbound_queue(); From 132e57b6debd7a591eade36cf731676162d6e5b9 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 08:24:10 -0400 Subject: [PATCH 04/12] bump version --- include/llarp/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/llarp/version.h b/include/llarp/version.h index 7eacf3a42..e7e0484db 100644 --- a/include/llarp/version.h +++ b/include/llarp/version.h @@ -10,7 +10,7 @@ #endif #ifndef LLARP_VERSION_PATCH -#define LLARP_VERSION_PATCH "0" +#define LLARP_VERSION_PATCH "1" #endif #ifndef LLARP_VERSION_NUM From 0213ada22c954dfe8a43fc40534844f879cf24c1 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 08:30:46 -0400 Subject: [PATCH 05/12] don't lock --- llarp/iwp_link.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 368079871..3a712be2e 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -1560,11 +1560,11 @@ namespace iwp { server *serv = static_cast< server * >(l->impl); { - lock_t lock(serv->m_Connected_Mutex); + // lock_t lock(serv->m_Connected_Mutex); auto itr = serv->m_Connected.find(pubkey); if(itr != serv->m_Connected.end()) { - lock_t innerlock(serv->m_sessions_Mutex); + // lock_t innerlock(serv->m_sessions_Mutex); auto inner_itr = serv->m_sessions.find(itr->second); if(inner_itr != serv->m_sessions.end()) { From 4ca34995bfbef1c9b4ed106416cfa3f474808cbf Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 09:13:56 -0400 Subject: [PATCH 06/12] add inbound server connections to DHT --- .vscode/settings.json | 3 ++- include/llarp/nodedb.h | 2 +- include/llarp/router_contact.h | 3 +++ llarp/address_info.cpp | 10 ++++++++++ llarp/codel.hpp | 3 ++- llarp/iwp_link.cpp | 20 ++++++++++++++++---- llarp/link_intro.cpp | 5 +++-- llarp/router.cpp | 7 +++---- llarp/router.hpp | 2 +- llarp/router_contact.cpp | 6 ++++++ 10 files changed, 47 insertions(+), 14 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index f974c0315..e360b8362 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -57,6 +57,7 @@ "io": "cpp", "strstream": "cpp", "numeric": "cpp", - "valarray": "cpp" + "valarray": "cpp", + "*.ipp": "cpp" } } \ No newline at end of file diff --git a/include/llarp/nodedb.h b/include/llarp/nodedb.h index b04e36fa6..5c798fc10 100644 --- a/include/llarp/nodedb.h +++ b/include/llarp/nodedb.h @@ -94,7 +94,7 @@ struct llarp_async_verify_rc struct llarp_threadpool *cryptoworker; struct llarp_threadpool *diskworker; - /// router contact (should this be a pointer?) + /// router contact struct llarp_rc rc; /// result bool valid; diff --git a/include/llarp/router_contact.h b/include/llarp/router_contact.h index cd8c262bd..f9b3e12ca 100644 --- a/include/llarp/router_contact.h +++ b/include/llarp/router_contact.h @@ -53,6 +53,9 @@ llarp_rc_verify_sig(struct llarp_crypto *crypto, struct llarp_rc *rc); void llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src); +bool +llarp_rc_is_public_router(const struct llarp_rc *const rc); + void llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, struct llarp_ai_list *addr); diff --git a/llarp/address_info.cpp b/llarp/address_info.cpp index 47d56ff8a..10b678f88 100644 --- a/llarp/address_info.cpp +++ b/llarp/address_info.cpp @@ -218,6 +218,16 @@ llarp_ai_list_pushback(struct llarp_ai_list *l, struct llarp_ai *ai) l->list.push_back(*ai); } +size_t +llarp_ai_list_size(struct llarp_ai_list *l) +{ + if(l) + { + return l->list.size(); + } + return 0; +} + void llarp_ai_list_iterate(struct llarp_ai_list *l, struct llarp_ai_list_iter *itr) { diff --git a/llarp/codel.hpp b/llarp/codel.hpp index 4046165b9..0fbf22b29 100644 --- a/llarp/codel.hpp +++ b/llarp/codel.hpp @@ -62,8 +62,9 @@ namespace llarp firstPut = GetTime()(i); } + template < typename Queue_t > void - Process(std::queue< T >& result) + Process(Queue_t& result) { llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL; // auto start = llarp_time_now_ms(); diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 3a712be2e..393e23b4d 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -443,9 +443,10 @@ namespace iwp } bool - operator<(const InboundMessage &other) const + operator>(const InboundMessage &other) const { - return msgid < other.msgid; + // order in ascending order for codel queue + return msgid > other.msgid; } llarp_buffer_t @@ -463,6 +464,15 @@ namespace iwp } }; + struct OrderCompare + { + bool + operator()(const InboundMessage *left, const InboundMessage *right) + { + return left->msgid > right->msgid; + } + }; + struct PutTime { void @@ -510,12 +520,14 @@ namespace iwp bool process_inbound_queue() { - std::queue< InboundMessage * > q; + std::priority_queue< InboundMessage *, std::vector< InboundMessage * >, + InboundMessage::OrderCompare > + q; recvqueue.Process(q); while(q.size()) { // TODO: is this right? - auto &front = q.front(); + auto &front = q.top(); // the items are already sorted anyways so this doesn't really do much nextMsgID = std::max(nextMsgID, front->msgid); if(!router->HandleRecvLinkMessage(parent, front->Buffer())) diff --git a/llarp/link_intro.cpp b/llarp/link_intro.cpp index 43ed32b43..38d394d1a 100644 --- a/llarp/link_intro.cpp +++ b/llarp/link_intro.cpp @@ -2,6 +2,7 @@ #include #include #include "logger.hpp" +#include "router.hpp" namespace llarp { @@ -71,7 +72,7 @@ namespace llarp bool LinkIntroMessage::HandleMessage(llarp_router* router) const { - llarp::Info("got LIM from ", remote); + router->async_verify_RC(RC, !llarp_rc_is_public_router(RC)); return true; } -} +} // namespace llarp diff --git a/llarp/router.cpp b/llarp/router.cpp index 7a7f052f4..c12c983ee 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -526,7 +526,7 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job) { // llarp::Debug("try_connect got session"); auto session = job->session; - router->async_verify_RC(session, false, job); + router->async_verify_RC(session->get_remote_router(session), false, job); return; } // llarp::Debug("try_connect no session"); @@ -563,8 +563,7 @@ llarp_router::DiscardOutboundFor(const llarp::RouterID &remote) } void -llarp_router::async_verify_RC(llarp_link_session *session, - bool isExpectingClient, +llarp_router::async_verify_RC(llarp_rc *rc, bool isExpectingClient, llarp_link_establish_job *establish_job) { llarp_async_verify_rc *job = new llarp_async_verify_rc; @@ -579,7 +578,7 @@ llarp_router::async_verify_RC(llarp_link_session *session, job->cryptoworker = tp; job->diskworker = disk; - llarp_rc_copy(&job->rc, session->get_remote_router(session)); + llarp_rc_copy(&job->rc, rc); if(isExpectingClient) job->hook = &llarp_router::on_verify_client_rc; else diff --git a/llarp/router.hpp b/llarp/router.hpp index e7047ddfa..e4557af9d 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -191,7 +191,7 @@ struct llarp_router ScheduleTicker(uint64_t i = 1000); void - async_verify_RC(llarp_link_session *session, bool isExpectingClient, + async_verify_RC(llarp_rc *rc, bool isExpectingClient, llarp_link_establish_job *job = nullptr); static bool diff --git a/llarp/router_contact.cpp b/llarp/router_contact.cpp index cf83b3bfb..d144164e0 100644 --- a/llarp/router_contact.cpp +++ b/llarp/router_contact.cpp @@ -101,6 +101,12 @@ llarp_rc_decode_dict(struct dict_reader *r, llarp_buffer_t *key) return false; } +bool +llarp_rc_is_public_router(const struct llarp_rc *const rc) +{ + return rc->addrs && llarp_ai_list_size(rc->addrs) > 0; +} + void llarp_rc_copy(struct llarp_rc *dst, const struct llarp_rc *src) { From ab509e0a1373727c8c8cb89db629f53ff343c839 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 09:33:37 -0400 Subject: [PATCH 07/12] try not to crash --- include/llarp/link.h | 12 +++++++----- llarp/router.cpp | 22 +++++++++++++++++++++- llarp/router.hpp | 3 +++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/include/llarp/link.h b/include/llarp/link.h index 59c53a823..1dffe646e 100644 --- a/include/llarp/link.h +++ b/include/llarp/link.h @@ -22,8 +22,8 @@ extern "C" { struct llarp_link; /** - * wire layer transport session for point to point communication between us and - * another + * wire layer transport session for point to point communication between us + * and another */ struct llarp_link_session; @@ -75,8 +75,9 @@ struct llarp_link const char *(*name)(void); void (*get_our_address)(struct llarp_link *, struct llarp_ai *); /* - int (*register_listener)(struct llarp_link *, struct llarp_link_ev_listener); - void (*deregister_listener)(struct llarp_link *, int); + int (*register_listener)(struct llarp_link *, struct + llarp_link_ev_listener); void (*deregister_listener)(struct llarp_link *, + int); */ bool (*configure)(struct llarp_link *, struct llarp_ev_loop *, const char *, int, uint16_t); @@ -101,7 +102,8 @@ llarp_link_initialized(struct llarp_link *link); struct llarp_link_session { void *impl; - /** send an entire message, splits up into smaller pieces and does encryption + /** send an entire message, splits up into smaller pieces and does + * encryption */ bool (*sendto)(struct llarp_link_session *, llarp_buffer_t); /** return true if this session is timed out */ diff --git a/llarp/router.cpp b/llarp/router.cpp index c12c983ee..80a31796a 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -348,13 +348,15 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job) llarp_dht_put_peer(router->dht, &router->validRouters[pk]); // this was an outbound establish job - if(ctx->establish_job->session) + if(ctx->establish_job) { auto session = ctx->establish_job->session; router->FlushOutboundFor(pk, session->get_parent(session)); // this frees the job router->pendingEstablishJobs.erase(pk); } + else // this was an inbound session + router->FlushOutboundFor(pk, GetLinkWithSessionByPubkey(pk)); } void @@ -482,6 +484,19 @@ llarp_router::SessionClosed(const llarp::RouterID &remote) validRouters.erase(itr); } +llarp_link * +llarp_router::GetLinkWithSessionByPubkey(const llarp::RouterID &pubkey) +{ + for(auto &link : inboundLinks) + { + if(link->has_session_to(link, pubkey)) + return link; + } + if(outboundLink->has_session_to(outboundLink, pubkey)) + return outboundLink; + return nullptr; +} + void llarp_router::FlushOutboundFor(const llarp::RouterID &remote, llarp_link *chosen) @@ -492,6 +507,11 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote, { return; } + if(!chosen) + { + DiscardOutboundFor(remote); + return; + } while(itr->second.size()) { auto buf = llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer); diff --git a/llarp/router.hpp b/llarp/router.hpp index e4557af9d..2b59a7ab7 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -190,6 +190,9 @@ struct llarp_router void ScheduleTicker(uint64_t i = 1000); + llarp_link * + GetLinkWithSessionByPubkey(const llarp::RouterID &remote); + void async_verify_RC(llarp_rc *rc, bool isExpectingClient, llarp_link_establish_job *job = nullptr); From 2b8cd93d6a2c1e11de1f248b8bf11002595b3471 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 09:34:46 -0400 Subject: [PATCH 08/12] make it compile :^) --- llarp/router.cpp | 493 ++++++++++++++++++++++++----------------------- 1 file changed, 247 insertions(+), 246 deletions(-) diff --git a/llarp/router.cpp b/llarp/router.cpp index 80a31796a..06f94e493 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -356,7 +356,7 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job) router->pendingEstablishJobs.erase(pk); } else // this was an inbound session - router->FlushOutboundFor(pk, GetLinkWithSessionByPubkey(pk)); + router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); } void @@ -816,271 +816,272 @@ llarp_router::HasPendingConnectJob(const llarp::RouterID &remote) return pendingEstablishJobs.find(remote) != pendingEstablishJobs.end(); } -extern "C" { -struct llarp_router * -llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop, - struct llarp_logic *logic) +extern "C" { - llarp_router *router = new llarp_router(); - if(router) + struct llarp_router * + llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop, + struct llarp_logic *logic) { - router->netloop = netloop; - router->tp = tp; - router->logic = logic; - // TODO: make disk io threadpool count configurable -#ifdef TESTNET - router->disk = tp; -#else - router->disk = llarp_init_threadpool(1, "llarp-diskio"); -#endif - llarp_crypto_libsodium_init(&router->crypto); - } - return router; -} - -bool -llarp_configure_router(struct llarp_router *router, struct llarp_config *conf) -{ - llarp_config_iterator iter; - iter.user = router; - iter.visit = llarp::router_iter_config; - llarp_config_iter(conf, &iter); - if(!router->InitOutboundLink()) - return false; - if(!router->Ready()) - { - return false; - } - return router->EnsureIdentity(); -} - -void -llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb) -{ - router->nodedb = nodedb; - router->Run(); -} - -bool -llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote, - uint16_t numretries) -{ - char ftmp[68] = {0}; - const char *hexname = - llarp::HexEncode< llarp::PubKey, decltype(ftmp) >(remote->pubkey, ftmp); - - // do we already have a pending job for this remote? - if(router->HasPendingConnectJob(remote->pubkey)) - { - llarp::Debug("We have pending connect jobs to ", hexname); - return false; - } - // try first address only - llarp_ai addr; - if(llarp_ai_list_index(remote->addrs, 0, &addr)) - { - auto link = router->outboundLink; - auto itr = router->pendingEstablishJobs.emplace( - std::make_pair(remote->pubkey, llarp_link_establish_job())); - auto job = &itr.first->second; - llarp_ai_copy(&job->ai, &addr); - memcpy(job->pubkey, remote->pubkey, PUBKEYSIZE); - job->retries = numretries; - job->timeout = 10000; - job->result = &llarp_router::on_try_connect_result; - // give router as user pointer - job->user = router; - // try establishing - link->try_establish(link, job); - return true; - } - llarp::Warn("couldn't get first address for ", hexname); - return false; -} - -void -llarp_rc_clear(struct llarp_rc *rc) -{ - // zero out router contact - llarp::Zero(rc, sizeof(llarp_rc)); -} - -void -llarp_rc_set_pubenckey(struct llarp_rc *rc, const uint8_t *pubenckey) -{ - // set public encryption key - memcpy(rc->enckey, pubenckey, PUBKEYSIZE); -} - -void -llarp_rc_set_pubsigkey(struct llarp_rc *rc, const uint8_t *pubsigkey) -{ - // set public signing key - memcpy(rc->pubkey, pubsigkey, PUBKEYSIZE); -} - -void -llarp_rc_set_pubkey(struct llarp_rc *rc, const uint8_t *pubenckey, - const uint8_t *pubsigkey) -{ - // set public encryption key - llarp_rc_set_pubenckey(rc, pubenckey); - // set public signing key - llarp_rc_set_pubsigkey(rc, pubsigkey); -} - -struct llarp_rc * -llarp_rc_read(const char *fpath) -{ - fs::path our_rc_file(fpath); - std::error_code ec; - if(!fs::exists(our_rc_file, ec)) - { - printf("File[%s] not found\n", fpath); - return 0; - } - std::ifstream f(our_rc_file, std::ios::binary); - if(!f.is_open()) - { - printf("Can't open file [%s]\n", fpath); - return 0; - } - byte_t tmp[MAX_RC_SIZE]; - llarp_buffer_t buf = llarp::StackBuffer< decltype(tmp) >(tmp); - f.seekg(0, std::ios::end); - size_t sz = f.tellg(); - f.seekg(0, std::ios::beg); - - if(sz > buf.sz) - return 0; - - f.read((char *)buf.base, sz); - // printf("contents[%s]\n", tmpc); - llarp_rc *rc = new llarp_rc; - llarp::Zero(rc, sizeof(llarp_rc)); - if(!llarp_rc_bdecode(rc, &buf)) - { - printf("Can't decode [%s]\n", fpath); - return 0; - } - return rc; -} - -bool -llarp_rc_addr_list_iter(struct llarp_ai_list_iter *iter, struct llarp_ai *ai) -{ - struct llarp_rc *rc = (llarp_rc *)iter->user; - llarp_ai_list_pushback(rc->addrs, ai); - return true; -} - -void -llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, - struct llarp_ai_list *addr) -{ - rc->addrs = llarp_ai_list_new(); - struct llarp_ai_list_iter ai_itr; - ai_itr.user = rc; - ai_itr.visit = &llarp_rc_addr_list_iter; - llarp_ai_list_iterate(addr, &ai_itr); -} - -bool -llarp_rc_write(struct llarp_rc *rc, const char *fpath) -{ - fs::path our_rc_file(fpath); - byte_t tmp[MAX_RC_SIZE]; - auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - - if(llarp_rc_bencode(rc, &buf)) - { - std::ofstream f(our_rc_file, std::ios::binary); - if(f.is_open()) + llarp_router *router = new llarp_router(); + if(router) { - f.write((char *)buf.base, buf.cur - buf.base); + router->netloop = netloop; + router->tp = tp; + router->logic = logic; + // TODO: make disk io threadpool count configurable +#ifdef TESTNET + router->disk = tp; +#else + router->disk = llarp_init_threadpool(1, "llarp-diskio"); +#endif + llarp_crypto_libsodium_init(&router->crypto); + } + return router; + } + + bool + llarp_configure_router(struct llarp_router *router, struct llarp_config *conf) + { + llarp_config_iterator iter; + iter.user = router; + iter.visit = llarp::router_iter_config; + llarp_config_iter(conf, &iter); + if(!router->InitOutboundLink()) + return false; + if(!router->Ready()) + { + return false; + } + return router->EnsureIdentity(); + } + + void + llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb) + { + router->nodedb = nodedb; + router->Run(); + } + + bool + llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote, + uint16_t numretries) + { + char ftmp[68] = {0}; + const char *hexname = + llarp::HexEncode< llarp::PubKey, decltype(ftmp) >(remote->pubkey, ftmp); + + // do we already have a pending job for this remote? + if(router->HasPendingConnectJob(remote->pubkey)) + { + llarp::Debug("We have pending connect jobs to ", hexname); + return false; + } + // try first address only + llarp_ai addr; + if(llarp_ai_list_index(remote->addrs, 0, &addr)) + { + auto link = router->outboundLink; + auto itr = router->pendingEstablishJobs.emplace( + std::make_pair(remote->pubkey, llarp_link_establish_job())); + auto job = &itr.first->second; + llarp_ai_copy(&job->ai, &addr); + memcpy(job->pubkey, remote->pubkey, PUBKEYSIZE); + job->retries = numretries; + job->timeout = 10000; + job->result = &llarp_router::on_try_connect_result; + // give router as user pointer + job->user = router; + // try establishing + link->try_establish(link, job); return true; } + llarp::Warn("couldn't get first address for ", hexname); + return false; } - return false; -} -void -llarp_rc_sign(llarp_crypto *crypto, const byte_t *seckey, struct llarp_rc *rc) -{ - byte_t buf[MAX_RC_SIZE]; - auto signbuf = llarp::StackBuffer< decltype(buf) >(buf); - // zero out previous signature - llarp::Zero(rc->signature, sizeof(rc->signature)); - // encode - if(llarp_rc_bencode(rc, &signbuf)) + void + llarp_rc_clear(struct llarp_rc *rc) { - // sign - signbuf.sz = signbuf.cur - signbuf.base; - crypto->sign(rc->signature, seckey, signbuf); + // zero out router contact + llarp::Zero(rc, sizeof(llarp_rc)); } -} -void -llarp_stop_router(struct llarp_router *router) -{ - if(router) - router->Close(); -} - -void -llarp_router_iterate_links(struct llarp_router *router, - struct llarp_router_link_iter i) -{ - for(auto link : router->inboundLinks) - if(!i.visit(&i, router, link)) - return; - i.visit(&i, router, router->outboundLink); -} - -void -llarp_free_router(struct llarp_router **router) -{ - if(*router) + void + llarp_rc_set_pubenckey(struct llarp_rc *rc, const uint8_t *pubenckey) { - delete *router; + // set public encryption key + memcpy(rc->enckey, pubenckey, PUBKEYSIZE); } - *router = nullptr; -} -void -llarp_router_override_path_selection(struct llarp_router *router, - llarp_pathbuilder_select_hop_func func) -{ - if(func) - router->selectHopFunc = func; -} - -bool -llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath, - byte_t *secretkey) -{ - llarp::Debug("find or create ", fpath); - fs::path path(fpath); - std::error_code ec; - if(!fs::exists(path, ec)) + void + llarp_rc_set_pubsigkey(struct llarp_rc *rc, const uint8_t *pubsigkey) { - llarp::Info("generating new identity key"); - crypto->identity_keygen(secretkey); - std::ofstream f(path, std::ios::binary); - if(f.is_open()) + // set public signing key + memcpy(rc->pubkey, pubsigkey, PUBKEYSIZE); + } + + void + llarp_rc_set_pubkey(struct llarp_rc *rc, const uint8_t *pubenckey, + const uint8_t *pubsigkey) + { + // set public encryption key + llarp_rc_set_pubenckey(rc, pubenckey); + // set public signing key + llarp_rc_set_pubsigkey(rc, pubsigkey); + } + + struct llarp_rc * + llarp_rc_read(const char *fpath) + { + fs::path our_rc_file(fpath); + std::error_code ec; + if(!fs::exists(our_rc_file, ec)) { - f.write((char *)secretkey, SECKEYSIZE); + printf("File[%s] not found\n", fpath); + return 0; } + std::ifstream f(our_rc_file, std::ios::binary); + if(!f.is_open()) + { + printf("Can't open file [%s]\n", fpath); + return 0; + } + byte_t tmp[MAX_RC_SIZE]; + llarp_buffer_t buf = llarp::StackBuffer< decltype(tmp) >(tmp); + f.seekg(0, std::ios::end); + size_t sz = f.tellg(); + f.seekg(0, std::ios::beg); + + if(sz > buf.sz) + return 0; + + f.read((char *)buf.base, sz); + // printf("contents[%s]\n", tmpc); + llarp_rc *rc = new llarp_rc; + llarp::Zero(rc, sizeof(llarp_rc)); + if(!llarp_rc_bdecode(rc, &buf)) + { + printf("Can't decode [%s]\n", fpath); + return 0; + } + return rc; } - std::ifstream f(path, std::ios::binary); - if(f.is_open()) + + bool + llarp_rc_addr_list_iter(struct llarp_ai_list_iter *iter, struct llarp_ai *ai) { - f.read((char *)secretkey, SECKEYSIZE); + struct llarp_rc *rc = (llarp_rc *)iter->user; + llarp_ai_list_pushback(rc->addrs, ai); return true; } - llarp::Info("failed to get identity key"); - return false; -} + + void + llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, + struct llarp_ai_list *addr) + { + rc->addrs = llarp_ai_list_new(); + struct llarp_ai_list_iter ai_itr; + ai_itr.user = rc; + ai_itr.visit = &llarp_rc_addr_list_iter; + llarp_ai_list_iterate(addr, &ai_itr); + } + + bool + llarp_rc_write(struct llarp_rc *rc, const char *fpath) + { + fs::path our_rc_file(fpath); + byte_t tmp[MAX_RC_SIZE]; + auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); + + if(llarp_rc_bencode(rc, &buf)) + { + std::ofstream f(our_rc_file, std::ios::binary); + if(f.is_open()) + { + f.write((char *)buf.base, buf.cur - buf.base); + return true; + } + } + return false; + } + + void + llarp_rc_sign(llarp_crypto *crypto, const byte_t *seckey, struct llarp_rc *rc) + { + byte_t buf[MAX_RC_SIZE]; + auto signbuf = llarp::StackBuffer< decltype(buf) >(buf); + // zero out previous signature + llarp::Zero(rc->signature, sizeof(rc->signature)); + // encode + if(llarp_rc_bencode(rc, &signbuf)) + { + // sign + signbuf.sz = signbuf.cur - signbuf.base; + crypto->sign(rc->signature, seckey, signbuf); + } + } + + void + llarp_stop_router(struct llarp_router *router) + { + if(router) + router->Close(); + } + + void + llarp_router_iterate_links(struct llarp_router *router, + struct llarp_router_link_iter i) + { + for(auto link : router->inboundLinks) + if(!i.visit(&i, router, link)) + return; + i.visit(&i, router, router->outboundLink); + } + + void + llarp_free_router(struct llarp_router **router) + { + if(*router) + { + delete *router; + } + *router = nullptr; + } + + void + llarp_router_override_path_selection(struct llarp_router *router, + llarp_pathbuilder_select_hop_func func) + { + if(func) + router->selectHopFunc = func; + } + + bool + llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath, + byte_t *secretkey) + { + llarp::Debug("find or create ", fpath); + fs::path path(fpath); + std::error_code ec; + if(!fs::exists(path, ec)) + { + llarp::Info("generating new identity key"); + crypto->identity_keygen(secretkey); + std::ofstream f(path, std::ios::binary); + if(f.is_open()) + { + f.write((char *)secretkey, SECKEYSIZE); + } + } + std::ifstream f(path, std::ios::binary); + if(f.is_open()) + { + f.read((char *)secretkey, SECKEYSIZE); + return true; + } + llarp::Info("failed to get identity key"); + return false; + } } // end extern C From cc23d8ddbda09b2ceb1ddb35bdffcda5383c1d7f Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 09:34:53 -0400 Subject: [PATCH 09/12] format --- llarp/router.cpp | 463 +++++++++++++++++++++++------------------------ 1 file changed, 231 insertions(+), 232 deletions(-) diff --git a/llarp/router.cpp b/llarp/router.cpp index 06f94e493..e0bd3d539 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -816,272 +816,271 @@ llarp_router::HasPendingConnectJob(const llarp::RouterID &remote) return pendingEstablishJobs.find(remote) != pendingEstablishJobs.end(); } -extern "C" +extern "C" { +struct llarp_router * +llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop, + struct llarp_logic *logic) { - struct llarp_router * - llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop, - struct llarp_logic *logic) + llarp_router *router = new llarp_router(); + if(router) { - llarp_router *router = new llarp_router(); - if(router) - { - router->netloop = netloop; - router->tp = tp; - router->logic = logic; - // TODO: make disk io threadpool count configurable + router->netloop = netloop; + router->tp = tp; + router->logic = logic; + // TODO: make disk io threadpool count configurable #ifdef TESTNET - router->disk = tp; + router->disk = tp; #else - router->disk = llarp_init_threadpool(1, "llarp-diskio"); + router->disk = llarp_init_threadpool(1, "llarp-diskio"); #endif - llarp_crypto_libsodium_init(&router->crypto); - } - return router; + llarp_crypto_libsodium_init(&router->crypto); } + return router; +} - bool - llarp_configure_router(struct llarp_router *router, struct llarp_config *conf) +bool +llarp_configure_router(struct llarp_router *router, struct llarp_config *conf) +{ + llarp_config_iterator iter; + iter.user = router; + iter.visit = llarp::router_iter_config; + llarp_config_iter(conf, &iter); + if(!router->InitOutboundLink()) + return false; + if(!router->Ready()) { - llarp_config_iterator iter; - iter.user = router; - iter.visit = llarp::router_iter_config; - llarp_config_iter(conf, &iter); - if(!router->InitOutboundLink()) - return false; - if(!router->Ready()) - { - return false; - } - return router->EnsureIdentity(); - } - - void - llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb) - { - router->nodedb = nodedb; - router->Run(); - } - - bool - llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote, - uint16_t numretries) - { - char ftmp[68] = {0}; - const char *hexname = - llarp::HexEncode< llarp::PubKey, decltype(ftmp) >(remote->pubkey, ftmp); - - // do we already have a pending job for this remote? - if(router->HasPendingConnectJob(remote->pubkey)) - { - llarp::Debug("We have pending connect jobs to ", hexname); - return false; - } - // try first address only - llarp_ai addr; - if(llarp_ai_list_index(remote->addrs, 0, &addr)) - { - auto link = router->outboundLink; - auto itr = router->pendingEstablishJobs.emplace( - std::make_pair(remote->pubkey, llarp_link_establish_job())); - auto job = &itr.first->second; - llarp_ai_copy(&job->ai, &addr); - memcpy(job->pubkey, remote->pubkey, PUBKEYSIZE); - job->retries = numretries; - job->timeout = 10000; - job->result = &llarp_router::on_try_connect_result; - // give router as user pointer - job->user = router; - // try establishing - link->try_establish(link, job); - return true; - } - llarp::Warn("couldn't get first address for ", hexname); return false; } + return router->EnsureIdentity(); +} - void - llarp_rc_clear(struct llarp_rc *rc) +void +llarp_run_router(struct llarp_router *router, struct llarp_nodedb *nodedb) +{ + router->nodedb = nodedb; + router->Run(); +} + +bool +llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote, + uint16_t numretries) +{ + char ftmp[68] = {0}; + const char *hexname = + llarp::HexEncode< llarp::PubKey, decltype(ftmp) >(remote->pubkey, ftmp); + + // do we already have a pending job for this remote? + if(router->HasPendingConnectJob(remote->pubkey)) { - // zero out router contact - llarp::Zero(rc, sizeof(llarp_rc)); + llarp::Debug("We have pending connect jobs to ", hexname); + return false; } - - void - llarp_rc_set_pubenckey(struct llarp_rc *rc, const uint8_t *pubenckey) + // try first address only + llarp_ai addr; + if(llarp_ai_list_index(remote->addrs, 0, &addr)) { - // set public encryption key - memcpy(rc->enckey, pubenckey, PUBKEYSIZE); - } - - void - llarp_rc_set_pubsigkey(struct llarp_rc *rc, const uint8_t *pubsigkey) - { - // set public signing key - memcpy(rc->pubkey, pubsigkey, PUBKEYSIZE); - } - - void - llarp_rc_set_pubkey(struct llarp_rc *rc, const uint8_t *pubenckey, - const uint8_t *pubsigkey) - { - // set public encryption key - llarp_rc_set_pubenckey(rc, pubenckey); - // set public signing key - llarp_rc_set_pubsigkey(rc, pubsigkey); - } - - struct llarp_rc * - llarp_rc_read(const char *fpath) - { - fs::path our_rc_file(fpath); - std::error_code ec; - if(!fs::exists(our_rc_file, ec)) - { - printf("File[%s] not found\n", fpath); - return 0; - } - std::ifstream f(our_rc_file, std::ios::binary); - if(!f.is_open()) - { - printf("Can't open file [%s]\n", fpath); - return 0; - } - byte_t tmp[MAX_RC_SIZE]; - llarp_buffer_t buf = llarp::StackBuffer< decltype(tmp) >(tmp); - f.seekg(0, std::ios::end); - size_t sz = f.tellg(); - f.seekg(0, std::ios::beg); - - if(sz > buf.sz) - return 0; - - f.read((char *)buf.base, sz); - // printf("contents[%s]\n", tmpc); - llarp_rc *rc = new llarp_rc; - llarp::Zero(rc, sizeof(llarp_rc)); - if(!llarp_rc_bdecode(rc, &buf)) - { - printf("Can't decode [%s]\n", fpath); - return 0; - } - return rc; - } - - bool - llarp_rc_addr_list_iter(struct llarp_ai_list_iter *iter, struct llarp_ai *ai) - { - struct llarp_rc *rc = (llarp_rc *)iter->user; - llarp_ai_list_pushback(rc->addrs, ai); + auto link = router->outboundLink; + auto itr = router->pendingEstablishJobs.emplace( + std::make_pair(remote->pubkey, llarp_link_establish_job())); + auto job = &itr.first->second; + llarp_ai_copy(&job->ai, &addr); + memcpy(job->pubkey, remote->pubkey, PUBKEYSIZE); + job->retries = numretries; + job->timeout = 10000; + job->result = &llarp_router::on_try_connect_result; + // give router as user pointer + job->user = router; + // try establishing + link->try_establish(link, job); return true; } + llarp::Warn("couldn't get first address for ", hexname); + return false; +} - void - llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, - struct llarp_ai_list *addr) +void +llarp_rc_clear(struct llarp_rc *rc) +{ + // zero out router contact + llarp::Zero(rc, sizeof(llarp_rc)); +} + +void +llarp_rc_set_pubenckey(struct llarp_rc *rc, const uint8_t *pubenckey) +{ + // set public encryption key + memcpy(rc->enckey, pubenckey, PUBKEYSIZE); +} + +void +llarp_rc_set_pubsigkey(struct llarp_rc *rc, const uint8_t *pubsigkey) +{ + // set public signing key + memcpy(rc->pubkey, pubsigkey, PUBKEYSIZE); +} + +void +llarp_rc_set_pubkey(struct llarp_rc *rc, const uint8_t *pubenckey, + const uint8_t *pubsigkey) +{ + // set public encryption key + llarp_rc_set_pubenckey(rc, pubenckey); + // set public signing key + llarp_rc_set_pubsigkey(rc, pubsigkey); +} + +struct llarp_rc * +llarp_rc_read(const char *fpath) +{ + fs::path our_rc_file(fpath); + std::error_code ec; + if(!fs::exists(our_rc_file, ec)) { - rc->addrs = llarp_ai_list_new(); - struct llarp_ai_list_iter ai_itr; - ai_itr.user = rc; - ai_itr.visit = &llarp_rc_addr_list_iter; - llarp_ai_list_iterate(addr, &ai_itr); + printf("File[%s] not found\n", fpath); + return 0; } - - bool - llarp_rc_write(struct llarp_rc *rc, const char *fpath) + std::ifstream f(our_rc_file, std::ios::binary); + if(!f.is_open()) { - fs::path our_rc_file(fpath); - byte_t tmp[MAX_RC_SIZE]; - auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - - if(llarp_rc_bencode(rc, &buf)) - { - std::ofstream f(our_rc_file, std::ios::binary); - if(f.is_open()) - { - f.write((char *)buf.base, buf.cur - buf.base); - return true; - } - } - return false; + printf("Can't open file [%s]\n", fpath); + return 0; } + byte_t tmp[MAX_RC_SIZE]; + llarp_buffer_t buf = llarp::StackBuffer< decltype(tmp) >(tmp); + f.seekg(0, std::ios::end); + size_t sz = f.tellg(); + f.seekg(0, std::ios::beg); - void - llarp_rc_sign(llarp_crypto *crypto, const byte_t *seckey, struct llarp_rc *rc) + if(sz > buf.sz) + return 0; + + f.read((char *)buf.base, sz); + // printf("contents[%s]\n", tmpc); + llarp_rc *rc = new llarp_rc; + llarp::Zero(rc, sizeof(llarp_rc)); + if(!llarp_rc_bdecode(rc, &buf)) { - byte_t buf[MAX_RC_SIZE]; - auto signbuf = llarp::StackBuffer< decltype(buf) >(buf); - // zero out previous signature - llarp::Zero(rc->signature, sizeof(rc->signature)); - // encode - if(llarp_rc_bencode(rc, &signbuf)) - { - // sign - signbuf.sz = signbuf.cur - signbuf.base; - crypto->sign(rc->signature, seckey, signbuf); - } + printf("Can't decode [%s]\n", fpath); + return 0; } + return rc; +} - void - llarp_stop_router(struct llarp_router *router) - { - if(router) - router->Close(); - } +bool +llarp_rc_addr_list_iter(struct llarp_ai_list_iter *iter, struct llarp_ai *ai) +{ + struct llarp_rc *rc = (llarp_rc *)iter->user; + llarp_ai_list_pushback(rc->addrs, ai); + return true; +} - void - llarp_router_iterate_links(struct llarp_router *router, - struct llarp_router_link_iter i) - { - for(auto link : router->inboundLinks) - if(!i.visit(&i, router, link)) - return; - i.visit(&i, router, router->outboundLink); - } +void +llarp_rc_set_addrs(struct llarp_rc *rc, struct llarp_alloc *mem, + struct llarp_ai_list *addr) +{ + rc->addrs = llarp_ai_list_new(); + struct llarp_ai_list_iter ai_itr; + ai_itr.user = rc; + ai_itr.visit = &llarp_rc_addr_list_iter; + llarp_ai_list_iterate(addr, &ai_itr); +} - void - llarp_free_router(struct llarp_router **router) - { - if(*router) - { - delete *router; - } - *router = nullptr; - } +bool +llarp_rc_write(struct llarp_rc *rc, const char *fpath) +{ + fs::path our_rc_file(fpath); + byte_t tmp[MAX_RC_SIZE]; + auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - void - llarp_router_override_path_selection(struct llarp_router *router, - llarp_pathbuilder_select_hop_func func) + if(llarp_rc_bencode(rc, &buf)) { - if(func) - router->selectHopFunc = func; - } - - bool - llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath, - byte_t *secretkey) - { - llarp::Debug("find or create ", fpath); - fs::path path(fpath); - std::error_code ec; - if(!fs::exists(path, ec)) - { - llarp::Info("generating new identity key"); - crypto->identity_keygen(secretkey); - std::ofstream f(path, std::ios::binary); - if(f.is_open()) - { - f.write((char *)secretkey, SECKEYSIZE); - } - } - std::ifstream f(path, std::ios::binary); + std::ofstream f(our_rc_file, std::ios::binary); if(f.is_open()) { - f.read((char *)secretkey, SECKEYSIZE); + f.write((char *)buf.base, buf.cur - buf.base); return true; } - llarp::Info("failed to get identity key"); - return false; } + return false; +} + +void +llarp_rc_sign(llarp_crypto *crypto, const byte_t *seckey, struct llarp_rc *rc) +{ + byte_t buf[MAX_RC_SIZE]; + auto signbuf = llarp::StackBuffer< decltype(buf) >(buf); + // zero out previous signature + llarp::Zero(rc->signature, sizeof(rc->signature)); + // encode + if(llarp_rc_bencode(rc, &signbuf)) + { + // sign + signbuf.sz = signbuf.cur - signbuf.base; + crypto->sign(rc->signature, seckey, signbuf); + } +} + +void +llarp_stop_router(struct llarp_router *router) +{ + if(router) + router->Close(); +} + +void +llarp_router_iterate_links(struct llarp_router *router, + struct llarp_router_link_iter i) +{ + for(auto link : router->inboundLinks) + if(!i.visit(&i, router, link)) + return; + i.visit(&i, router, router->outboundLink); +} + +void +llarp_free_router(struct llarp_router **router) +{ + if(*router) + { + delete *router; + } + *router = nullptr; +} + +void +llarp_router_override_path_selection(struct llarp_router *router, + llarp_pathbuilder_select_hop_func func) +{ + if(func) + router->selectHopFunc = func; +} + +bool +llarp_findOrCreateIdentity(llarp_crypto *crypto, const char *fpath, + byte_t *secretkey) +{ + llarp::Debug("find or create ", fpath); + fs::path path(fpath); + std::error_code ec; + if(!fs::exists(path, ec)) + { + llarp::Info("generating new identity key"); + crypto->identity_keygen(secretkey); + std::ofstream f(path, std::ios::binary); + if(f.is_open()) + { + f.write((char *)secretkey, SECKEYSIZE); + } + } + std::ifstream f(path, std::ios::binary); + if(f.is_open()) + { + f.read((char *)secretkey, SECKEYSIZE); + return true; + } + llarp::Info("failed to get identity key"); + return false; +} } // end extern C From 54aed396c27ff4fd05af37039e7cb042cd9c6e28 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 09:54:43 -0400 Subject: [PATCH 10/12] don't repeat process messages on link layer --- llarp/iwp_link.cpp | 13 +++++++------ llarp/path.cpp | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 393e23b4d..a3b615511 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -469,7 +469,7 @@ namespace iwp bool operator()(const InboundMessage *left, const InboundMessage *right) { - return left->msgid > right->msgid; + return left->msgid < right->msgid; } }; @@ -524,6 +524,7 @@ namespace iwp InboundMessage::OrderCompare > q; recvqueue.Process(q); + bool increment = false; while(q.size()) { // TODO: is this right? @@ -536,7 +537,10 @@ namespace iwp } delete front; q.pop(); + increment = true; } + if(increment) + ++nextMsgID; // TODO: this isn't right return true; } @@ -1809,14 +1813,11 @@ namespace iwp } ++nextMsgID; } - else - { - recvqueue.Put(new InboundMessage(id, msg)); - success = process_inbound_queue(); - } } else { + llarp::Warn("out of order message expected ", nextMsgID, " but got ", + id); recvqueue.Put(new InboundMessage(id, msg)); success = true; } diff --git a/llarp/path.cpp b/llarp/path.cpp index 041f5ba9a..a430fd8e8 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -309,7 +309,7 @@ namespace llarp Path::Tick(llarp_time_t now, llarp_router* r) { auto dlt = now - m_LastLatencyTestTime; - if(dlt > 5000) + if(dlt > 5000 && m_LastLatencyTestID == 0) { llarp::routing::PathLatencyMessage latency; latency.T = rand(); @@ -432,6 +432,7 @@ namespace llarp Latency = llarp_time_now_ms() - m_LastLatencyTestTime; llarp::Info("path latency is ", Latency, " ms for tx=", TXID(), " rx=", RXID()); + m_LastLatencyTestID = 0; return true; } return false; From b994bf5f5bc382c6593dbe05bf3b346c379c4838 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 09:57:31 -0400 Subject: [PATCH 11/12] fix previous commit --- llarp/iwp_link.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index a3b615511..975b438b1 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -1811,8 +1811,8 @@ namespace iwp impl->parent->close(impl->parent); success = false; } - ++nextMsgID; } + ++nextMsgID; } else { From f134b8c4d1d539869893987bed9f66bf88a6573e Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Tue, 3 Jul 2018 10:04:13 -0400 Subject: [PATCH 12/12] always put messages in inbound processing queue --- llarp/iwp_link.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/llarp/iwp_link.cpp b/llarp/iwp_link.cpp index 975b438b1..e43e06281 100644 --- a/llarp/iwp_link.cpp +++ b/llarp/iwp_link.cpp @@ -1793,9 +1793,10 @@ namespace iwp if(id == nextMsgID) { session *impl = static_cast< session * >(parent->impl); - success = router->HandleRecvLinkMessage(parent, buf); + if(id == 0) { + success = router->HandleRecvLinkMessage(parent, buf); if(impl->CheckRCValid()) { if(!impl->IsEstablished()) @@ -1803,6 +1804,7 @@ namespace iwp impl->send_LIM(); impl->session_established(); } + ++nextMsgID; } else { @@ -1812,7 +1814,11 @@ namespace iwp success = false; } } - ++nextMsgID; + else + { + recvqueue.Put(new InboundMessage(id, msg)); + success = process_inbound_queue(); + } } else {