diff --git a/daemon/lokinet.cpp b/daemon/lokinet.cpp index ab89e790a..70c9648f2 100644 --- a/daemon/lokinet.cpp +++ b/daemon/lokinet.cpp @@ -390,6 +390,9 @@ lokinet_main(int argc, char* argv[]) llarp::log::add_sink(llarp::log::Type::Print, "stderr"); llarp::log::reset_level(llarp::log::Level::info); + llarp::logRingBuffer = std::make_shared(100); + llarp::log::add_sink(llarp::logRingBuffer, llarp::log::DEFAULT_PATTERN_MONO); + llarp::RuntimeOptions opts; opts.showBanner = false; diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt index 19ed9b05f..0e43855c0 100644 --- a/external/CMakeLists.txt +++ b/external/CMakeLists.txt @@ -62,7 +62,8 @@ macro(system_or_submodule BIGNAME smallname pkgconf subdir) endmacro() system_or_submodule(OXENC oxenc liboxenc>=1.0.4 oxen-encoding) -system_or_submodule(OXENMQ oxenmq liboxenmq>=1.2.12 oxen-mq) +system_or_submodule(OXENMQ oxenmq liboxenmq>=1.2.14 oxen-mq) + set(JSON_BuildTests OFF CACHE INTERNAL "") set(JSON_Install OFF CACHE INTERNAL "") system_or_submodule(NLOHMANN nlohmann_json nlohmann_json>=3.7.0 nlohmann) diff --git a/external/oxen-logging b/external/oxen-logging index 98a8882c8..9f2323a2d 160000 --- a/external/oxen-logging +++ b/external/oxen-logging @@ -1 +1 @@ -Subproject commit 98a8882c81aa046fbadc0571fcea7bf92ed20154 +Subproject commit 9f2323a2db5fc54fe8394892769eff859967f735 diff --git a/external/oxen-mq b/external/oxen-mq index eadb37c76..526f51ba4 160000 --- a/external/oxen-mq +++ b/external/oxen-mq @@ -1 +1 @@ -Subproject commit eadb37c7654150bef18497773718f15ef843734a +Subproject commit 526f51ba4ef273b58a46746cbaf72f2a84371f30 diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index bb9e0f514..440c848f6 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -59,7 +59,7 @@ namespace llarp , inbound_link_msg_parser{this} , _hiddenServiceContext{this} , m_RoutePoker{std::make_shared()} - , m_RPCServer{new rpc::RpcServer{m_lmq, this}} + , m_RPCServer{nullptr} , _randomStartDelay{ platform::is_simulation ? std::chrono::milliseconds{(llarp::randint() % 1250) + 2000} : 0s} @@ -791,6 +791,12 @@ namespace llarp log::clear_sinks(); log::add_sink(log_type, conf.logging.m_logFile); + // re-add rpc log sink if rpc enabled, else free it + if (enableRPCServer and llarp::logRingBuffer) + log::add_sink(llarp::logRingBuffer, llarp::log::DEFAULT_PATTERN_MONO); + else + llarp::logRingBuffer = nullptr; + return true; } @@ -1206,6 +1212,7 @@ namespace llarp { if (enableRPCServer) { + m_RPCServer.reset(new rpc::RpcServer{m_lmq, this}); m_RPCServer->AsyncServeRPC(rpcBindAddr); LogInfo("Bound RPC server to ", rpcBindAddr.full_address()); } diff --git a/llarp/rpc/rpc_server.cpp b/llarp/rpc/rpc_server.cpp index ecff8c3f3..7a0c23969 100644 --- a/llarp/rpc/rpc_server.cpp +++ b/llarp/rpc/rpc_server.cpp @@ -12,10 +12,17 @@ #include #include #include +#include + +namespace +{ + static auto logcat = llarp::log::Cat("lokinet.rpc"); +} // namespace namespace llarp::rpc { - RpcServer::RpcServer(LMQ_ptr lmq, AbstractRouter* r) : m_LMQ(std::move(lmq)), m_Router(r) + RpcServer::RpcServer(LMQ_ptr lmq, AbstractRouter* r) + : m_LMQ{std::move(lmq)}, m_Router{r}, log_subs{*m_LMQ, llarp::logRingBuffer} {} /// maybe parse json from message paramter at index @@ -138,6 +145,7 @@ namespace llarp::rpc { m_LMQ->listen_plain(url.zmq_address()); m_LMQ->add_category("llarp", oxenmq::AuthLevel::none) + .add_request_command("logs", [this](oxenmq::Message& msg) { HandleLogsSubRequest(msg); }) .add_command( "halt", [&](oxenmq::Message& msg) { @@ -708,4 +716,39 @@ namespace llarp::rpc }); }); } + + void + RpcServer::HandleLogsSubRequest(oxenmq::Message& m) + { + if (m.data.size() != 1) + { + m.send_reply("Invalid subscription request: no log receipt endpoint given"); + return; + } + + auto endpoint = std::string{m.data[0]}; + + if (endpoint == "unsubscribe") + { + log::info(logcat, "New logs unsubscribe request from conn {}@{}", m.conn, m.remote); + log_subs.unsubscribe(m.conn); + m.send_reply("OK"); + return; + } + + auto is_new = log_subs.subscribe(m.conn, endpoint); + + if (is_new) + { + log::info(logcat, "New logs subscription request from conn {}@{}", m.conn, m.remote); + m.send_reply("OK"); + log_subs.send_all(m.conn, endpoint); + } + else + { + log::info(logcat, "Renewed logs subscription request from conn id {}@{}", m.conn, m.remote); + m.send_reply("ALREADY"); + } + } + } // namespace llarp::rpc diff --git a/llarp/rpc/rpc_server.hpp b/llarp/rpc/rpc_server.hpp index ca21e9e01..5b411c494 100644 --- a/llarp/rpc/rpc_server.hpp +++ b/llarp/rpc/rpc_server.hpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace llarp { @@ -21,7 +22,12 @@ namespace llarp::rpc AsyncServeRPC(oxenmq::address addr); private: + void + HandleLogsSubRequest(oxenmq::Message& m); + LMQ_ptr m_LMQ; AbstractRouter* const m_Router; + + oxen::log::PubsubLogger log_subs; }; } // namespace llarp::rpc diff --git a/llarp/util/logging.hpp b/llarp/util/logging.hpp index b51ad1827..09c31fc92 100644 --- a/llarp/util/logging.hpp +++ b/llarp/util/logging.hpp @@ -7,6 +7,7 @@ #include #include +#include #include "oxen/log/internal.hpp" namespace llarp @@ -25,6 +26,8 @@ namespace llarp // Deprecated loggers (in the top-level llarp namespace): namespace llarp { + inline std::shared_ptr logRingBuffer = nullptr; + namespace log_detail { inline log::CategoryLogger legacy_logger = log::Cat(""); diff --git a/llarp/win32/exec.cpp b/llarp/win32/exec.cpp index bce7f02d7..4f17b8f04 100644 --- a/llarp/win32/exec.cpp +++ b/llarp/win32/exec.cpp @@ -31,7 +31,7 @@ namespace llarp::win32 } OneShotExec::OneShotExec(std::string cmd, std::chrono::milliseconds timeout) - : _si{}, _pi{}, _timeout{timeout.count()} + : _si{}, _pi{}, _timeout{static_cast(timeout.count())} { log::info(logcat, "exec: {}", cmd); if (not CreateProcessA(