1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

refactoring liblokinet export fxns

This commit is contained in:
dan 2023-04-19 10:05:30 -07:00
parent 2b17059213
commit 3ae45781c8
5 changed files with 192 additions and 169 deletions

View file

@ -4,5 +4,5 @@
#include "lokinet/lokinet_srv.h"
#include "lokinet/lokinet_misc.h"
#include "lokinet/lokinet_addr.h"
#include "lokinet/lokinet_stream.h"
#include "lokinet/lokinet_tcp.h"
#include "lokinet/lokinet_udp.h"

View file

@ -7,28 +7,29 @@ extern "C"
{
#endif
/// the result of a lokinet stream mapping attempt
struct lokinet_stream_result
/// the result of a lokinet tcp mapping attempt
struct lokinet_tcp_result
{
/// set to zero on success otherwise the error that happened
/// use strerror(3) to get printable string of this error
int error;
int error{0};
/// set once the tcp connection is successfully established
bool success{false};
/// the local ip address we mapped the remote endpoint to
/// null terminated
char local_address[256];
/// the local port we mapped the remote endpoint to
int local_port;
/// the id of the stream we created
int stream_id;
/// the id (aka: 'pseudo-port') of the tcp we created
int tcp_id;
};
/// connect out to a remote endpoint
/// remoteAddr is in the form of "name:port"
/// localAddr is either NULL for any or in the form of "ip:port" to bind to an explicit address
void EXPORT
lokinet_outbound_stream(
struct lokinet_stream_result* result,
lokinet_outbound_tcp(
struct lokinet_tcp_result* result,
const char* remoteAddr,
const char* localAddr,
struct lokinet_context* context,
@ -36,26 +37,26 @@ extern "C"
void (*close_cb)(int rv, void* user_data),
void* user_data);
/// stream accept filter determines if we should accept a stream or not
/// tcp accept filter determines if we should accept a tcp or not
/// return 0 to accept
/// return -1 to explicitly reject
/// return -2 to silently drop
typedef int (*lokinet_stream_filter)(const char* remote, uint16_t port, void* userdata);
typedef int (*lokinet_tcp_filter)(const char* remote, uint16_t port, void* userdata);
/// set stream accepter filter
/// passes user parameter into stream filter as void *
/// returns stream id
/// set tcp accepter filter
/// passes user parameter into tcp filter as void *
/// returns tcp id
int EXPORT
lokinet_inbound_stream_filter(
lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* context);
lokinet_inbound_tcp_filter(
lokinet_tcp_filter acceptFilter, void* user, struct lokinet_context* context);
/// simple stream acceptor
/// simple variant of lokinet_inbound_stream_filter that maps port to localhost:port
/// simple tcp acceptor
/// simple variant of lokinet_inbound_tcp_filter that maps port to localhost:port
int EXPORT
lokinet_inbound_stream(uint16_t port, struct lokinet_context* context);
lokinet_inbound_tcp(uint16_t port, struct lokinet_context* context);
void EXPORT
lokinet_close_stream(int stream_id, struct lokinet_context* context);
lokinet_close_tcp(int tcp_id, struct lokinet_context* context);
#ifdef __cplusplus
}

View file

@ -1,4 +1,6 @@
#include <lokinet.h>
#include <algorithm>
#include <exception>
#include <filesystem>
#include <llarp.hpp>
#include <llarp/config/config.hpp>
@ -19,6 +21,8 @@
#include <memory>
#include <chrono>
#include <stdexcept>
#include <unordered_map>
#include "lokinet/lokinet_tcp.h"
#ifdef _WIN32
#define EHOSTDOWN ENETDOWN
@ -311,41 +315,48 @@ struct lokinet_context
return impl->router->hiddenServiceContext().GetEndpointByName(name);
}
std::unordered_map<int, bool> streams;
/// false: outbound connection
/// true: inbound connection
std::unordered_map<int, bool> tcp_conns;
/// maps address to pair of (stream_id, ready)
std::unordered_map<std::string, lokinet_tcp_result> active_conns;
std::unordered_map<int, std::shared_ptr<UDPHandler>> udp_sockets;
void
inbound_stream(int id)
inbound_tcp(int id)
{
streams[id] = true;
tcp_conns[id] = true;
}
void
outbound_stream(int id)
outbound_tcp(std::string remote_addr, lokinet_tcp_result& res)
{
streams[id] = false;
tcp_conns[res.tcp_id] = false;
active_conns[remote_addr] = lokinet_tcp_result{res};
}
};
namespace
{
void
stream_error(lokinet_stream_result* result, int err)
tcp_error(lokinet_tcp_result* result, int err)
{
std::memset(result, 0, sizeof(lokinet_stream_result));
std::memset(result, 0, sizeof(lokinet_tcp_result));
result->error = err;
}
void
stream_okay(lokinet_stream_result* result, std::string host, int port, int stream_id)
tcp_okay(lokinet_tcp_result* result, std::string host, int port, int tcp_id)
{
stream_error(result, 0);
tcp_error(result, 0);
std::copy_n(
host.c_str(),
std::min(host.size(), sizeof(result->local_address) - 1),
result->local_address);
result->local_port = port;
result->stream_id = stream_id;
result->tcp_id = tcp_id;
}
std::pair<std::string, int>
@ -358,7 +369,7 @@ namespace
portStr = data.substr(pos + 1);
}
else
throw EINVAL;
throw std::invalid_argument("Error: invalid address passed");
if (auto* serv = getservbyname(portStr.c_str(), proto.c_str()))
{
@ -643,8 +654,8 @@ extern "C"
}
void EXPORT
lokinet_outbound_stream(
struct lokinet_stream_result* result,
lokinet_outbound_tcp(
struct lokinet_tcp_result* result,
const char* remote,
const char* local,
struct lokinet_context* ctx,
@ -654,155 +665,159 @@ extern "C"
{
if (ctx == nullptr)
{
stream_error(result, EHOSTDOWN);
tcp_error(result, EHOSTDOWN);
return;
}
std::promise<void> promise;
if (auto itr = ctx->active_conns.find(remote); itr != ctx->active_conns.end())
{
auto lock = ctx->acquire();
if (not ctx->impl->IsUp())
{
stream_error(result, EHOSTDOWN);
return;
}
std::string remotehost;
int remoteport;
try
{
auto [h, p] = split_host_port(remote);
remotehost = h;
remoteport = p;
}
catch (int err)
{
stream_error(result, err);
return;
}
// TODO: make configurable (?)
// FIXME: appears unused?
std::string endpoint{"default"};
llarp::SockAddr localAddr;
try
{
if (local)
localAddr = llarp::SockAddr{std::string{local}};
else
localAddr = llarp::SockAddr{"127.0.0.1:0"};
}
catch (std::exception& ex)
{
stream_error(result, EINVAL);
return;
}
auto on_open = [localAddr, remotehost, remoteport, open_cb](bool success, void* user_data) {
llarp::log::info(
logcat,
"Quic tunnel {}<->{}:{} {}.",
localAddr,
remotehost,
remoteport,
success ? "opened successfully" : "failed");
if (open_cb)
open_cb(success, user_data);
};
auto on_close = [localAddr, remotehost, remoteport, close_cb](int rv, void* user_data) {
llarp::log::info(
logcat, "Quic tunnel {}<->{}:{} closed.", localAddr, remotehost, remoteport);
if (close_cb)
close_cb(rv, user_data);
};
auto call = [&promise,
ctx,
result,
router = ctx->impl->router,
remotehost,
remoteport,
endpoint, // FIXME: appears unused?
on_open,
on_close,
localAddr]() {
auto ep = ctx->endpoint();
if (ep == nullptr)
{
stream_error(result, ENOTSUP);
promise.set_value();
return;
}
auto* quic = ep->GetQUICTunnel();
if (quic == nullptr)
{
stream_error(result, ENOTSUP);
promise.set_value();
return;
}
try
{
auto [addr, id] = quic->open(
remotehost, remoteport, std::move(on_open), std::move(on_close), localAddr);
auto [host, port] = split_host_port(addr.ToString());
ctx->outbound_stream(id);
stream_okay(result, host, port, id);
}
catch (std::exception& ex)
{
std::cout << ex.what() << std::endl;
stream_error(result, ECANCELED);
}
catch (int err)
{
stream_error(result, err);
}
promise.set_value();
};
ctx->impl->CallSafe([call]() {
// we dont want the mainloop to die in case setting the value on the promise fails
try
{
call();
}
catch (...)
{}
});
result->success = true;
llarp::LogError("Active connection to {} already exists", remote);
return;
}
auto future = promise.get_future();
std::promise<void> promise;
std::future<void> future = promise.get_future();
auto lock = ctx->acquire();
if (not ctx->impl->IsUp())
{
tcp_error(result, EHOSTDOWN);
return;
}
std::string remotehost;
int remoteport;
try
{
if (auto status = future.wait_for(std::chrono::seconds{10});
status == std::future_status::ready)
{
future.get();
}
auto [h, p] = split_host_port(remote);
remotehost = h;
remoteport = p;
}
catch (std::exception& e)
{
llarp::log::error(logcat, "Error: exception caught: {}", e.what());
tcp_error(result, EINVAL);
return;
}
// TODO: make configurable (?)
// FIXME: appears unused?
std::string endpoint{"default"};
llarp::SockAddr localAddr;
try
{
if (local)
localAddr = llarp::SockAddr{std::string{local}};
else
{
stream_error(result, ETIMEDOUT);
}
localAddr = llarp::SockAddr{"127.0.0.1:0"};
}
catch (std::exception& ex)
{
stream_error(result, EBADF);
tcp_error(result, EINVAL);
return;
}
auto on_open = [result, localAddr, remotehost, remoteport, open_cb](bool success, void* user_data) {
llarp::log::info(
logcat,
"Quic tunnel {}<->{}:{} {}.",
localAddr,
remotehost,
remoteport,
success ? "opened successfully" : "failed");
result->success = success;
if (open_cb)
open_cb(success, user_data);
};
auto on_close = [&ctx, localAddr, remote, close_cb](int rv, void* user_data) {
llarp::log::info(
logcat, "Quic tunnel {}<->{} closed.", localAddr, remote);
ctx->active_conns.erase(remote);
if (close_cb)
close_cb(rv, user_data);
};
ctx->impl->CallSafe([promise = std::move(promise),
ctx,
result,
router = ctx->impl->router,
remotehost,
remoteport,
on_open = std::move(on_open),
on_close = std::move(on_close),
localAddr]() mutable {
try
{
auto ep = ctx->endpoint();
if (not ep)
throw std::runtime_error{"lokinet_context->endpoint() returned null pointer."};
auto* quic = ep->GetQUICTunnel();
if (not quic)
throw std::runtime_error{"lokinet_context endpoint has no quic tunnel manager."};
auto [addr, id] = quic->open(
remotehost, remoteport, std::move(on_open), std::move(on_close), localAddr);
auto [host, port] = split_host_port(addr.ToString());
result->tcp_id = id;
tcp_okay(result, host, port, id);
promise.set_value();
}
catch (...)
{
promise.set_exception(std::current_exception());
}
});
try
{
future.get();
}
catch (std::invalid_argument& e)
{
llarp::log::error(logcat, "Error: exception caught: {}", e.what());
tcp_error(result, EINVAL);
return;
}
catch (std::runtime_error& e)
{
llarp::log::error(logcat, "Error: exception caught: {}", e.what());
tcp_error(result, ENOTSUP);
return;
}
catch (std::exception& e)
{
llarp::log::error(logcat, "Error: exception caught: {}", e.what());
tcp_error(result, EBADF);
return;
}
catch (...)
{
llarp::log::error(logcat, "Unknown exception caught.", );
tcp_error(result, EBADF);
return;
}
ctx->outbound_tcp(remote, *result);
assert(result->error == 0);
return;
}
int EXPORT
lokinet_inbound_stream(uint16_t port, struct lokinet_context* ctx)
lokinet_inbound_tcp(uint16_t port, struct lokinet_context* ctx)
{
/// FIXME: delete pointer later
return lokinet_inbound_stream_filter(&accept_port, (void*)new std::uintptr_t{port}, ctx);
return lokinet_inbound_tcp_filter(&accept_port, (void*)new std::uintptr_t{port}, ctx);
}
int EXPORT
lokinet_inbound_stream_filter(
lokinet_stream_filter acceptFilter, void* user, struct lokinet_context* ctx)
lokinet_inbound_tcp_filter(
lokinet_tcp_filter acceptFilter, void* user, struct lokinet_context* ctx)
{
if (acceptFilter == nullptr)
{
@ -842,7 +857,7 @@ extern "C"
auto id = ftr.get();
{
auto lock = ctx->acquire();
ctx->inbound_stream(id);
ctx->inbound_tcp(id);
}
return id;
}
@ -865,7 +880,7 @@ extern "C"
}
void EXPORT
lokinet_close_stream(int stream_id, struct lokinet_context* ctx)
lokinet_close_tcp(int tcp_id, struct lokinet_context* ctx)
{
if (not ctx)
return;
@ -876,21 +891,26 @@ extern "C"
try
{
std::promise<void> promise;
bool inbound = ctx->streams.at(stream_id);
ctx->impl->CallSafe([stream_id, inbound, ctx, &promise]() {
bool inbound = ctx->tcp_conns.at(tcp_id);
ctx->impl->CallSafe([tcp_id, inbound, ctx, &promise]() {
auto ep = ctx->endpoint();
auto* quic = ep->GetQUICTunnel();
try
{
if (inbound)
quic->forget(stream_id);
quic->forget(tcp_id);
else
quic->close(stream_id);
quic->close(tcp_id);
}
catch (...)
{}
promise.set_value();
});
for (auto& itr : ctx->active_conns)
{
if (itr.second.tcp_id == tcp_id)
ctx->active_conns.erase(itr.first);
}
promise.get_future().get();
}
catch (...)

View file

@ -119,7 +119,9 @@ namespace llarp::quic
// which it holds a reference to still exists, as stream->close will segfault
// otherwise
if (auto locked_conn = weak_conn.lock())
stream->close();
{
stream->close(-1);
}
stream->data(nullptr);
}
c.data(nullptr);

View file

@ -364,4 +364,4 @@ namespace llarp::rpc
}
}
}
} // namespace llarp::rpc
} // namespace llarp::rpc