omq rpc log subscription

respects whether RPC is enabled, removes the log sink otherwise

bumps oxen-mq and oxen-logging
This commit is contained in:
Thomas Winget 2022-09-28 12:37:19 -04:00 committed by Jason Rhinelander
parent fc07b8a10e
commit eaf30de1fd
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
9 changed files with 69 additions and 6 deletions

View File

@ -390,6 +390,9 @@ lokinet_main(int argc, char* argv[])
llarp::log::add_sink(llarp::log::Type::Print, "stderr");
llarp::log::reset_level(llarp::log::Level::info);
llarp::logRingBuffer = std::make_shared<llarp::log::RingBufferSink>(100);
llarp::log::add_sink(llarp::logRingBuffer, llarp::log::DEFAULT_PATTERN_MONO);
llarp::RuntimeOptions opts;
opts.showBanner = false;

View File

@ -62,7 +62,8 @@ macro(system_or_submodule BIGNAME smallname pkgconf subdir)
endmacro()
system_or_submodule(OXENC oxenc liboxenc>=1.0.4 oxen-encoding)
system_or_submodule(OXENMQ oxenmq liboxenmq>=1.2.12 oxen-mq)
system_or_submodule(OXENMQ oxenmq liboxenmq>=1.2.14 oxen-mq)
set(JSON_BuildTests OFF CACHE INTERNAL "")
set(JSON_Install OFF CACHE INTERNAL "")
system_or_submodule(NLOHMANN nlohmann_json nlohmann_json>=3.7.0 nlohmann)

@ -1 +1 @@
Subproject commit 98a8882c81aa046fbadc0571fcea7bf92ed20154
Subproject commit 9f2323a2db5fc54fe8394892769eff859967f735

2
external/oxen-mq vendored

@ -1 +1 @@
Subproject commit eadb37c7654150bef18497773718f15ef843734a
Subproject commit 526f51ba4ef273b58a46746cbaf72f2a84371f30

View File

@ -59,7 +59,7 @@ namespace llarp
, inbound_link_msg_parser{this}
, _hiddenServiceContext{this}
, m_RoutePoker{std::make_shared<RoutePoker>()}
, m_RPCServer{new rpc::RpcServer{m_lmq, this}}
, m_RPCServer{nullptr}
, _randomStartDelay{
platform::is_simulation ? std::chrono::milliseconds{(llarp::randint() % 1250) + 2000}
: 0s}
@ -791,6 +791,12 @@ namespace llarp
log::clear_sinks();
log::add_sink(log_type, conf.logging.m_logFile);
// re-add rpc log sink if rpc enabled, else free it
if (enableRPCServer and llarp::logRingBuffer)
log::add_sink(llarp::logRingBuffer, llarp::log::DEFAULT_PATTERN_MONO);
else
llarp::logRingBuffer = nullptr;
return true;
}
@ -1206,6 +1212,7 @@ namespace llarp
{
if (enableRPCServer)
{
m_RPCServer.reset(new rpc::RpcServer{m_lmq, this});
m_RPCServer->AsyncServeRPC(rpcBindAddr);
LogInfo("Bound RPC server to ", rpcBindAddr.full_address());
}

View File

@ -12,10 +12,17 @@
#include <llarp/service/name.hpp>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/dns/dns.hpp>
#include <oxenmq/fmt.h>
namespace
{
static auto logcat = llarp::log::Cat("lokinet.rpc");
} // namespace
namespace llarp::rpc
{
RpcServer::RpcServer(LMQ_ptr lmq, AbstractRouter* r) : m_LMQ(std::move(lmq)), m_Router(r)
RpcServer::RpcServer(LMQ_ptr lmq, AbstractRouter* r)
: m_LMQ{std::move(lmq)}, m_Router{r}, log_subs{*m_LMQ, llarp::logRingBuffer}
{}
/// maybe parse json from message paramter at index
@ -138,6 +145,7 @@ namespace llarp::rpc
{
m_LMQ->listen_plain(url.zmq_address());
m_LMQ->add_category("llarp", oxenmq::AuthLevel::none)
.add_request_command("logs", [this](oxenmq::Message& msg) { HandleLogsSubRequest(msg); })
.add_command(
"halt",
[&](oxenmq::Message& msg) {
@ -708,4 +716,39 @@ namespace llarp::rpc
});
});
}
void
RpcServer::HandleLogsSubRequest(oxenmq::Message& m)
{
if (m.data.size() != 1)
{
m.send_reply("Invalid subscription request: no log receipt endpoint given");
return;
}
auto endpoint = std::string{m.data[0]};
if (endpoint == "unsubscribe")
{
log::info(logcat, "New logs unsubscribe request from conn {}@{}", m.conn, m.remote);
log_subs.unsubscribe(m.conn);
m.send_reply("OK");
return;
}
auto is_new = log_subs.subscribe(m.conn, endpoint);
if (is_new)
{
log::info(logcat, "New logs subscription request from conn {}@{}", m.conn, m.remote);
m.send_reply("OK");
log_subs.send_all(m.conn, endpoint);
}
else
{
log::info(logcat, "Renewed logs subscription request from conn id {}@{}", m.conn, m.remote);
m.send_reply("ALREADY");
}
}
} // namespace llarp::rpc

View File

@ -3,6 +3,7 @@
#include <string_view>
#include <oxenmq/oxenmq.h>
#include <oxenmq/address.h>
#include <oxen/log/omq_logger.hpp>
namespace llarp
{
@ -21,7 +22,12 @@ namespace llarp::rpc
AsyncServeRPC(oxenmq::address addr);
private:
void
HandleLogsSubRequest(oxenmq::Message& m);
LMQ_ptr m_LMQ;
AbstractRouter* const m_Router;
oxen::log::PubsubLogger log_subs;
};
} // namespace llarp::rpc

View File

@ -7,6 +7,7 @@
#include <array>
#include <oxen/log.hpp>
#include <oxen/log/ring_buffer_sink.hpp>
#include "oxen/log/internal.hpp"
namespace llarp
@ -25,6 +26,8 @@ namespace llarp
// Deprecated loggers (in the top-level llarp namespace):
namespace llarp
{
inline std::shared_ptr<log::RingBufferSink> logRingBuffer = nullptr;
namespace log_detail
{
inline log::CategoryLogger legacy_logger = log::Cat("");

View File

@ -31,7 +31,7 @@ namespace llarp::win32
}
OneShotExec::OneShotExec(std::string cmd, std::chrono::milliseconds timeout)
: _si{}, _pi{}, _timeout{timeout.count()}
: _si{}, _pi{}, _timeout{static_cast<DWORD>(timeout.count())}
{
log::info(logcat, "exec: {}", cmd);
if (not CreateProcessA(