1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

Fix some ngtcp2 usage and conn issue detection

Clients now get an ERR_PROTO packet when something goes wrong and the
server is unhappy.

ngtcp2_accept changed between our last update of it and current, this
fixes its usage.

With a minor change to the C API, liblokinet userland will now be able
to receive notification when a connection is (un)successfully opened,
and when it closes.  TODO: supply a reason for the close, if we decide
it should.
This commit is contained in:
Thomas Winget 2023-01-19 00:07:01 -05:00
parent 4c95fc53b6
commit ebcb44ae3a
7 changed files with 94 additions and 18 deletions

View file

@ -25,6 +25,8 @@
namespace
{
static auto logcat = llarp::log::Cat("liblokinet");
struct Context : public llarp::Context
{
using llarp::Context::Context;
@ -690,6 +692,7 @@ extern "C"
stream_error(result, EINVAL);
return;
}
auto call = [&promise,
ctx,
result,
@ -714,9 +717,18 @@ extern "C"
}
try
{
// FIXME: callback for client-land?
auto on_open = [localAddr,remotehost,remoteport](bool success) {
llarp::log::info(logcat, "Quic tunnel {}<->{}:{} {}.",
localAddr, remotehost, remoteport,
success ? "opened successfully" : "failed");
};
auto on_close = [localAddr,remotehost,remoteport]() {
llarp::log::info(logcat, "Quic tunnel {}<->{}:{} closed.",
localAddr, remotehost, remoteport);
};
auto [addr, id] = quic->open(
remotehost, remoteport, [](bool success) {}, localAddr);
remotehost, remoteport, std::move(on_open), std::move(on_close), localAddr);
auto [host, port] = split_host_port(addr.ToString());
ctx->outbound_stream(id);
stream_okay(result, host, port, id);

View file

@ -287,6 +287,11 @@ namespace llarp::quic
// least 1 when this callback is invoked).
std::function<void(Connection&)> on_stream_available;
// Callback that is invoked by the Connection's owning Endpoint whenever the Connection
// is put into a "closing" state (draining, closing, immediate dismissal, etc.) After
// calling this once, the Endpoint clears it.
std::function<void(Connection&)> on_closing;
// Returns the number of available streams that can currently be opened on the connection
int
get_streams_available();

View file

@ -102,8 +102,8 @@ namespace llarp::quic
{
ngtcp2_version_cid vi;
auto rv = ngtcp2_pkt_decode_version_cid(&vi, u8data(p.data), p.data.size(), NGTCP2_MAX_CIDLEN);
if (rv == 1)
{ // 1 means Version Negotiation should be sent and otherwise the packet should be ignored
if (rv == NGTCP2_ERR_VERSION_NEGOTIATION)
{ // Version Negotiation should be sent and otherwise the packet should be ignored
send_version_negotiation(vi, p.path.remote);
return std::nullopt;
}
@ -166,8 +166,16 @@ namespace llarp::quic
log::debug(logcat, "Draining connection {}", conn.base_cid);
start_draining(conn);
}
else if (rv == NGTCP2_ERR_PROTO)
{
log::warning(logcat, "Immediate Close-ing connection {} due to error {}", conn.base_cid, ngtcp2_strerror(rv));
close_connection(conn, rv, "ERR_PROTO"sv);
}
else if (rv == NGTCP2_ERR_DROP_CONN)
{
log::warning(logcat, "Deleting connection {} due to error {}", conn.base_cid, ngtcp2_strerror(rv));
delete_conn(conn.base_cid);
}
return {rv};
}
@ -288,6 +296,12 @@ namespace llarp::quic
{
if (conn.draining)
return;
if (conn.on_closing)
{
log::trace(logcat, "Calling Connection.on_closing for connection {}", conn.base_cid);
conn.on_closing(conn); // only call once
conn.on_closing = nullptr;
}
log::debug(logcat, "Putting {} into draining mode", conn.base_cid);
conn.draining = true;
// Recommended draining time is 3*Probe Timeout
@ -357,6 +371,17 @@ namespace llarp::quic
}
bool primary = std::holds_alternative<primary_conn_ptr>(it->second);
if (primary)
{
auto ptr = var::get<primary_conn_ptr>(it->second);
if (ptr->on_closing)
{
log::trace(logcat, "Calling Connection.on_closing for connection {}", cid);
ptr->on_closing(*ptr); // only call once
ptr->on_closing = nullptr;
}
}
log::debug(logcat, "Deleting {} connection {}", primary ? "primary" : "alias", cid);
conns.erase(it);
if (primary)

View file

@ -207,7 +207,7 @@ namespace llarp::quic
void
check_timeouts();
/// Deletes a connection from `conns`; if the connecion is a primary connection shared pointer
/// Deletes a connection from `conns`; if the connection is a primary connection shared pointer
/// then it is removed and clean_alias_conns() is immediately called to remove any aliases to
/// the connection. If the given connection is an alias connection then it is removed but no
/// cleanup is performed. Returns true if something was removed, false if the connection was

View file

@ -22,14 +22,7 @@ namespace llarp::quic
ngtcp2_pkt_hd hd;
auto rv = ngtcp2_accept(&hd, u8data(p.data), p.data.size());
if (rv == -1)
{ // Invalid packet
log::warning(logcat, "Invalid packet received, length={}", p.data.size());
log::trace(logcat, "packet body: {}", buffer_printer{p.data});
return nullptr;
}
if (rv == 1)
if (rv == NGTCP2_ERR_VERSION_NEGOTIATION)
{ // Invalid/unexpected version, send a version negotiation
log::debug(logcat, "Invalid/unsupported version; sending version negotiation");
send_version_negotiation(
@ -38,6 +31,13 @@ namespace llarp::quic
p.path.remote);
return nullptr;
}
else if (rv < 0)
{ // Invalid packet. rv could be NGTCP2_ERR_RETRY but that will only
// happen if the incoming packet is 0RTT which we don't use.
log::warning(logcat, "Invalid packet received, length={}", p.data.size());
log::trace(logcat, "packet body: {}", buffer_printer{p.data});
return nullptr;
}
if (hd.type == NGTCP2_PKT_0RTT)
{

View file

@ -100,10 +100,11 @@ namespace llarp::quic
{
tcp.data(stream.shared_from_this());
stream.weak_data(tcp.weak_from_this());
auto weak_conn = stream.get_connection().weak_from_this();
tcp.clear(); // Clear any existing initial event handlers
tcp.on<uvw::CloseEvent>([](auto&, uvw::TCPHandle& c) {
tcp.on<uvw::CloseEvent>([weak_conn=std::move(weak_conn)](auto&, uvw::TCPHandle& c) {
// This fires sometime after we call `close()` to signal that the close is done.
if (auto stream = c.data<Stream>())
{
@ -111,7 +112,12 @@ namespace llarp::quic
logcat,
"Local TCP connection closed, closing associated quic stream {}",
stream->id());
stream->close();
// There is an awkwardness with Stream ownership, so make sure the Connection
// which it holds a reference to still exists, as stream->close will segfault
// otherwise
if (auto locked_conn = weak_conn.lock())
stream->close();
stream->data(nullptr);
}
c.data(nullptr);
@ -237,6 +243,8 @@ namespace llarp::quic
if (ct.conns.empty() and (not ct.tcp or not ct.tcp->active()))
{
log::debug(logcat, "All sockets closed on quic:{}, destroying tunnel data", port);
if (ct.close_cb)
ct.close_cb();
ctit = client_tunnels_.erase(ctit);
}
else
@ -249,8 +257,6 @@ namespace llarp::quic
void
TunnelManager::make_server()
{
// auto loop = get_loop();
server_ = std::make_unique<Server>(service_endpoint_);
server_->stream_open_callback = [this](Stream& stream, uint16_t port) -> bool {
stream.close_callback = close_tcp_pair;
@ -437,7 +443,7 @@ namespace llarp::quic
std::pair<SockAddr, uint16_t>
TunnelManager::open(
std::string_view remote_address, uint16_t port, OpenCallback on_open, SockAddr bind_addr)
std::string_view remote_address, uint16_t port, OpenCallback on_open, CloseCallback on_close, SockAddr bind_addr)
{
std::string remote_addr = lowercase_ascii_string(std::string{remote_address});
@ -509,6 +515,7 @@ namespace llarp::quic
assert(client_tunnels_.count(pport) == 0);
auto& ct = client_tunnels_[pport];
ct.open_cb = std::move(on_open);
ct.close_cb = std::move(on_close);
ct.tcp = std::move(tcp_tunnel);
// We use this pport shared_ptr value on the listening tcp socket both to hand to pport into the
// accept handler, and to let the accept handler know that `this` is still safe to use.
@ -604,8 +611,29 @@ namespace llarp::quic
{
flush_pending_incoming(it->second);
if (it->second.open_cb)
{
log::trace(logcat, "Calling ClientTunnel.open_cb()");
it->second.open_cb(true);
it->second.open_cb = nullptr; // only call once
}
}
else
log::warning(logcat, "Connection.on_stream_available fired but we have no associated ClientTunnel!");
};
conn->on_closing = [this, id = row.first](Connection&) {
log::debug(logcat, "QUIC connection :{} closing, closing tunnel", id);
if (auto it = client_tunnels_.find(id); it != client_tunnels_.end())
{
if (it->second.close_cb)
{
log::trace(logcat, "Calling ClientTunnel.close_cb()");
it->second.close_cb();
}
}
else
log::debug(logcat, "Connection.on_closing fired but no associated ClientTunnel found.");
this->close(id);
};
}

View file

@ -80,6 +80,9 @@ namespace llarp::quic
/// Called when open succeeds or times out.
using OpenCallback = std::function<void(bool success)>;
/// Called when the tunnel is closed for any reason
using CloseCallback = std::function<void(void)>;
/// Opens a quic tunnel to some remote lokinet address. (Should only be called from the event
/// loop thread.)
///
@ -115,6 +118,7 @@ namespace llarp::quic
std::string_view remote_addr,
uint16_t port,
OpenCallback on_open = {},
CloseCallback on_close = {},
SockAddr bind_addr = {127, 0, 0, 1});
/// Start closing an outgoing tunnel; takes the ID returned by `open()`. Note that an existing
@ -147,6 +151,8 @@ namespace llarp::quic
std::unique_ptr<Client> client;
// Callback to invoke on quic connection established (true argument) or failed (false arg)
OpenCallback open_cb;
// Callback to invoke when the tunnel is closed, if it was successfully opened
CloseCallback close_cb;
// TCP listening socket
std::shared_ptr<uvw::TCPHandle> tcp;
// Accepted TCP connections