diff --git a/oxenss/common/namespace.h b/oxenss/common/namespace.h index e88b73e..f073365 100644 --- a/oxenss/common/namespace.h +++ b/oxenss/common/namespace.h @@ -3,6 +3,7 @@ #include #include #include +#include "formattable.h" namespace oxen { @@ -28,4 +29,6 @@ std::string to_string(namespace_id ns); constexpr auto NAMESPACE_MIN = to_int(namespace_id::Min); constexpr auto NAMESPACE_MAX = to_int(namespace_id::Max); +template <> inline constexpr bool to_string_formattable = true; + } // namespace oxen diff --git a/oxenss/crypto/keys.cpp b/oxenss/crypto/keys.cpp index bd64992..8bc8601 100644 --- a/oxenss/crypto/keys.cpp +++ b/oxenss/crypto/keys.cpp @@ -8,7 +8,11 @@ #include #include #include -#include +#include +#include +#include +#include +#include namespace oxen::crypto { @@ -96,4 +100,41 @@ x25519_pubkey parse_x25519_pubkey(std::string_view pubkey_in) { return parse_pubkey(pubkey_in); } +std::array subkey_verify_key(std::string_view pubkey, std::string_view subkey) { + + if (pubkey.size() != 32 || subkey.size() != 32) + throw std::invalid_argument{"Invalid pubkey/subkey: both must be 32 bytes"}; + + return subkey_verify_key( + reinterpret_cast(pubkey.data()), + reinterpret_cast(subkey.data())); +} + +std::array subkey_verify_key( + const unsigned char* pubkey, const unsigned char* subkey) { + + std::array subkey_pub; + // Need to compute: (c + H("OxenSSSubkey" || c || A)) A and use that instead of A for verification: + + // H("OxenSSSubkey" || c || A): + crypto_generichash_state h_state; + crypto_generichash_init( + &h_state, + reinterpret_cast(SUBKEY_HASH_KEY.data()), + SUBKEY_HASH_KEY.size(), + 32); + crypto_generichash_update(&h_state, subkey, 32); // c + crypto_generichash_update(&h_state, pubkey, 32); // A + crypto_generichash_final(&h_state, subkey_pub.data(), 32); + + // c + H(...): + crypto_core_ed25519_scalar_add(subkey_pub.data(), subkey, subkey_pub.data()); + + // (c + H(...)) A: + if (0 != crypto_scalarmult_ed25519_noclamp(subkey_pub.data(), subkey_pub.data(), pubkey)) + throw std::invalid_argument{"Invalid pubkey/subkey combination"}; + + return subkey_pub; +} + } // namespace oxen::crypto diff --git a/oxenss/crypto/keys.h b/oxenss/crypto/keys.h index 736c2db..17c2381 100644 --- a/oxenss/crypto/keys.h +++ b/oxenss/crypto/keys.h @@ -14,6 +14,8 @@ namespace oxen::crypto { using namespace std::literals; +constexpr std::string_view SUBKEY_HASH_KEY = "OxenSSSubkey"sv; + namespace detail { template inline constexpr std::array null_bytes = {0}; @@ -91,6 +93,12 @@ legacy_pubkey parse_legacy_pubkey(std::string_view pubkey_in); ed25519_pubkey parse_ed25519_pubkey(std::string_view pubkey_in); x25519_pubkey parse_x25519_pubkey(std::string_view pubkey_in); +/// Computes the signature verification derived pubkey for a pubkey+subkey string. Throws +/// std::invalid_argument if the pubkey/subkey aren't valid. +std::array subkey_verify_key(std::string_view pubkey, std::string_view subkey); +std::array subkey_verify_key( + const unsigned char* pubkey, const unsigned char* subkey); + } // namespace oxen::crypto template <> diff --git a/oxenss/rpc/client_rpc_endpoints.h b/oxenss/rpc/client_rpc_endpoints.h index 90e0b17..45857d8 100644 --- a/oxenss/rpc/client_rpc_endpoints.h +++ b/oxenss/rpc/client_rpc_endpoints.h @@ -24,8 +24,6 @@ namespace oxen::rpc { using namespace std::literals; -constexpr std::string_view SUBKEY_HASH_KEY = "OxenSSSubkey"sv; - // Client rpc endpoints, accessible via the HTTPS storage_rpc endpoint, the OMQ // "storage.whatever" endpoints, and as the final target of an onion request. diff --git a/oxenss/rpc/request_handler.cpp b/oxenss/rpc/request_handler.cpp index 84bb9ff..c86f5f8 100644 --- a/oxenss/rpc/request_handler.cpp +++ b/oxenss/rpc/request_handler.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -274,26 +275,12 @@ namespace { return false; } - // Need to compute: (c + H(c || A)) A and use that instead of A for verification: - - // H(c || A): - crypto_generichash_state h_state; - crypto_generichash_init( - &h_state, - reinterpret_cast(SUBKEY_HASH_KEY.data()), - SUBKEY_HASH_KEY.size(), - 32); - crypto_generichash_update(&h_state, subkey->data(), 32); // c - crypto_generichash_update(&h_state, pk, 32); // A - crypto_generichash_final(&h_state, subkey_pub.data(), 32); - - // c + H(c || A): - crypto_core_ed25519_scalar_add(subkey_pub.data(), subkey->data(), subkey_pub.data()); - - // (c + H(c || A)) A: - if (0 != crypto_scalarmult_ed25519_noclamp(subkey_pub.data(), subkey_pub.data(), pk)) { - log::warning( - logcat, "Signature verification failed: invalid subkey multiplication"); + // Compute our verification key: (c + H("OxenSSSubkey" || c || A)) A and use that + // instead of A for verification: + try { + subkey_pub = crypto::subkey_verify_key(pk, subkey->data()); + } catch (const std::invalid_argument& ex) { + log::warning(logcat, "Signature verification failed: {}", ex.what()); return false; } pk = subkey_pub.data(); diff --git a/oxenss/server/CMakeLists.txt b/oxenss/server/CMakeLists.txt index 7eedc62..9d1dd3e 100644 --- a/oxenss/server/CMakeLists.txt +++ b/oxenss/server/CMakeLists.txt @@ -3,6 +3,7 @@ add_library(server STATIC https.cpp omq.cpp omq_logger.cpp + omq_monitor.cpp server_certificates.cpp) find_package(Threads) diff --git a/oxenss/server/omq.cpp b/oxenss/server/omq.cpp index 417e53a..f196926 100644 --- a/oxenss/server/omq.cpp +++ b/oxenss/server/omq.cpp @@ -1,6 +1,7 @@ #include "omq.h" #include +#include #include #include #include "utils.h" @@ -16,10 +17,13 @@ #include #include #include +#include #include +#include #include #include +#include #include namespace oxen::server { @@ -204,10 +208,6 @@ void OMQ::handle_get_stats(oxenmq::Message& message) { message.send_reply(payload); } -namespace { - -} // namespace - oxenc::bt_value json_to_bt(nlohmann::json j) { if (j.is_object()) { oxenc::bt_dict res; @@ -392,6 +392,12 @@ OMQ::OMQ( for (const auto& [name, _cb] : rpc::RequestHandler::client_rpc_endpoints) st_cat.add_request_command(std::string{name}, [this, name=name](auto& m) { handle_client_request(name, m); }); + // monitor.* endpoints are used to subscribe to events such as new messages arriving for an + // account. + omq_.add_category("monitor", oxenmq::AuthLevel::none, 1 /*reserved threads*/, 500 /*max queue*/) + .add_request_command("messages", [this](auto& m) { handle_monitor_messages(m); }) + ; + // Endpoints invokable by a local admin omq_.add_category("service", oxenmq::AuthLevel::admin) .add_request_command("get_stats", [this](auto& m) { handle_get_stats(m); }) diff --git a/oxenss/server/omq.h b/oxenss/server/omq.h index c0e3b7d..1e5f068 100644 --- a/oxenss/server/omq.h +++ b/oxenss/server/omq.h @@ -2,15 +2,18 @@ #include #include +#include #include #include +#include #include #include #include #include -#include +#include "../common/message.h" +#include "../snode/sn_record.h" namespace oxen::rpc { class RequestHandler; @@ -25,11 +28,36 @@ class ServiceNode; namespace oxen::server { +using namespace std::literals; + oxenc::bt_value json_to_bt(nlohmann::json j); nlohmann::json bt_to_json(oxenc::bt_dict_consumer d); nlohmann::json bt_to_json(oxenc::bt_list_consumer l); +struct MonitorData { + static constexpr auto MONITOR_EXPIRY_TIME = 65min; + + std::chrono::steady_clock::time_point expiry; // When this notify reg expires + std::vector namespaces; // sorted namespace_ids + oxenmq::ConnectionID push_conn; // ConnectionID to push notifications to + bool want_data; // true if the subscriber wants msg data + + MonitorData( + std::vector namespaces, + oxenmq::ConnectionID conn, + bool data, + std::chrono::seconds ttl = MONITOR_EXPIRY_TIME) : + expiry{std::chrono::steady_clock::now() + ttl}, + namespaces{std::move(namespaces)}, + push_conn{std::move(conn)}, + want_data{data} {} + + void reset_expiry(std::chrono::seconds ttl = MONITOR_EXPIRY_TIME) { + expiry = std::chrono::steady_clock::now() + ttl; + } +}; + class OMQ { oxenmq::OxenMQ omq_; oxenmq::ConnectionID oxend_conn_; @@ -41,6 +69,10 @@ class OMQ { rpc::RateLimiter* rate_limiter_ = nullptr; + // Tracks accounts we are monitoring for OMQ push notification messages + std::unordered_multimap monitoring_; + mutable std::shared_mutex monitoring_mutex_; + // Get node's address std::string peer_lookup(std::string_view pubkey_bin) const; @@ -83,6 +115,72 @@ class OMQ { void handle_client_request( std::string_view method, oxenmq::Message& message, bool forwarded = false); + /// Handles a subscription request to monitor new messages (OMQ endpoint monitor.messages). The + /// message body must be bt-encoded, and contains the following keys. Note that keys are + /// case-sensitive and, for proper bt-encoding, must be in ascii-sorted order (rather than the + /// order described here). Keys are: + /// - exactly one of: + /// - p -- the account public key, prefixed with the netid, in bytes (33 bytes). This should + /// be used for pubkeys that are ed keys (but not 05 session ids, see the next entry) + /// - P -- an ed25519 pubkey underlying a session ID, in bytes (32 bytes). The account + /// will be derived by converting to an x25519 pubkey and prepending the 0x05 byte. The + /// signature uses *this* key, not the derived x25519 key. + /// - S -- (optional) a 32-byte authentication subkey to use for authentication. The + /// signature with such a subkey uses a derived key (as described in the RPC endpoint + /// documentation). + /// - n -- list of namespace ids to monitor for new messages; the ids must be valid (i.e. -32768 + /// through 32767), must be sorted in numeric order, and must contain no duplicates. + /// - d -- set to 1 if the caller wants the full message data, 0 (or omitted) will omit the data + /// from notifications. + /// - t -- signature timestamp, in integer unix seconds (*not* milliseconds), associated with + /// the signature. This timestamp must be within the last 2 weeks (and no more than 1 day in + /// the future) for this request to be valid. + /// - s -- the signature associated with this message. This is an Ed25519 signature of the + /// value: + /// ( "MONITOR" || ACCOUNT || TS || D || NS[0] || "," || ... || "," || NS[n] ) + /// signed by the account Ed25519 key or derived subkey (if using subkey): + /// - ACCOUNT is the full account ID, expressed in hex (e.g. "0512345..."). + /// - TS is the signature timestamp value, expressed as a base-10 string + /// - D is "0" or "1" depending on whether data is wanted (i.e. the "d" request parameter) + /// - NS[i] are the namespace values from the request expressed as base-10 strings + /// + /// If the request validates then the connection is subscribed (for 65 minutes) to new incoming + /// messages in the given namespace(s). A caller should renew subscriptions periodically by + /// re-submitting the subscription request (with at most 1h between re-subscriptions). + /// + /// The reply to the subscription request is a bencoded dict containing keys: + /// - success -- included on successful subscription and set to the integer 1 + /// - errcode -- a numeric error value indicating the failure. Currently implemented are: + /// - 1 -- invalid arguments -- called for invalid data (e.g. wrong encoding, wrong value + /// type, or a missing required parameter) + /// - 2 -- invalid pubkey -- the given pubkey/session id is not a valid pubkey. + /// - 3 -- invalid namespace -- the namespaces provided are invalid (e.g. invalid value, not + /// sorted, or contains duplicates). + /// - 4 -- invalid timestamp -- the timestamp is not a valid integral timestamp, is too old, + /// or is in the future. + /// - 5 -- signature failed -- the signature failed to validate. + /// - 6 -- wrong swarm -- the given pubkey is not stored by this service node's swarm. + /// - error -- included whenever `errcode` is, this contains an English description of the + /// error. + /// + /// Each time a message is received the service node sends a message to the connection with a + /// first part (i.e. endpoint) of "notify.message", and second part containing the bt-encoded + /// message details in a dict with keys: + /// + /// - @ -- the account pubkey, in bytes (33). This is the actual account value, regardless of + /// which of `p`/`P`/`S` was used in the request. (Symbol so that it sorts very early). + /// - h -- the message hash + /// - n -- the message namespace (-32768 to 32767) + /// - t -- the message timestamp (milliseconds since unix epoch), as provided by the client who + /// deposited the message. + /// - z -- the expiry (milliseconds since unix epoch) of the message. + /// - ~d -- the message data. Note that this is only included if it was requested by specifying + /// `d` as 1 in the subscription request. (This is `~d` rather than `d` to put it at the end + /// of the dict, which makes construction here a little easier). + /// + /// Note that the client should accept (and ignore) unknown keys, to allow for future expansion. + void handle_monitor_messages(oxenmq::Message& message); + void handle_get_logs(oxenmq::Message& message); void handle_get_stats(oxenmq::Message& message); @@ -137,6 +235,9 @@ class OMQ { // Decodes onion request data; throws if invalid formatted or missing required fields. static std::pair decode_onion_data( std::string_view data); + + // Called during message submission to send notifications to anyone subscribed to them. + void send_notifies(message msg); }; } // namespace oxen::server diff --git a/oxenss/server/omq_monitor.cpp b/oxenss/server/omq_monitor.cpp new file mode 100644 index 0000000..0484f30 --- /dev/null +++ b/oxenss/server/omq_monitor.cpp @@ -0,0 +1,307 @@ +#include "omq.h" +#include "../common/namespace.h" +#include "../rpc/client_rpc_endpoints.h" +#include "../utils/time.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace oxen::server { + +using namespace std::literals; + +static auto logcat = log::Cat("monitor"); + +namespace { + + enum class MonitorResponse { + BAD_ARGS = 1, + BAD_PUBKEY = 2, + BAD_NS = 3, + BAD_TS = 4, + BAD_SIG = 5, + WRONG_SWARM = 6, + }; + + void monitor_error(oxenmq::Message& m, MonitorResponse r, std::string_view message) { + std::string buf; + buf.resize(message.size() + 30); + oxenc::bt_dict_producer d{buf.data(), buf.size()}; + d.append("errcode", static_cast>(r)); + d.append("error", message); + buf.resize(d.view().size()); + m.send_reply(buf); + } + +} // namespace + +void OMQ::handle_monitor_messages(oxenmq::Message& message) { + if (message.data.size() != 1) + return monitor_error( + message, + MonitorResponse::BAD_ARGS, + "Invalid arguments: monitor.messages takes a single bencoded dict parameter"); + const auto& m = message.data[0]; + if (m.size() < 2 || m.front() != 'd') + return monitor_error( + message, + MonitorResponse::BAD_ARGS, + "Invalid arguments: monitor.messages parameter must be a bencoded dict"); + oxenc::bt_dict_consumer d{m}; + + // Values we receive, in bt-dict order: + std::string_view ed_pk; // P + std::string_view subkey; // S + bool want_data = false; // d + using namespace_int = std::underlying_type_t; + std::vector namespaces; // n + std::string pubkey; // p + std::string_view signature; // s + std::chrono::seconds timestamp; // t + + try { + // Ed25519 pubkey for a Session ID (required if `p` is not present) + if (d.skip_until("P")) { + ed_pk = d.consume_string_view(); + if (ed_pk.size() != 32) + return monitor_error( + message, + MonitorResponse::BAD_PUBKEY, + "Provided P= Session Ed25519 pubkey must be 32 bytes"); + } + + // Subkey for subkey auth (optional) + if (d.skip_until("S")) { + subkey = d.consume_string_view(); + if (subkey.size() != 32) + return monitor_error( + message, + MonitorResponse::BAD_PUBKEY, + "Provided S= subkey must be 32 bytes"); + } + + // Send full data (optional) + if (d.skip_until("d")) + want_data = d.consume_integer(); + + // List of namespaces to monitor (required) + if (d.skip_until("n")) { + auto ns = d.consume_list_consumer(); + namespaces.push_back(static_cast(ns.consume_integer())); + while (!ns.is_finished()) { + auto nsi = static_cast(ns.consume_integer()); + if (nsi > namespaces.back()) + namespaces.push_back(nsi); + else + return monitor_error( + message, + MonitorResponse::BAD_NS, + "Invalid n= namespace list: namespaces must be ascending"); + } + } else { + throw std::runtime_error{"required namespace list is missing"}; + } + + if (d.skip_until("p")) { + if (!ed_pk.empty()) + throw std::runtime_error{"Cannot provide both p= and P= pubkey values"}; + pubkey = d.consume_string(); + if (pubkey.size() != 33) + monitor_error( + message, + MonitorResponse::BAD_PUBKEY, + "Provided p= pubkey must be 33 bytes"); + } else if (ed_pk.empty()) { + throw std::runtime_error{"Either p= or P= must be given"}; + } + + if (!d.skip_until("s")) + throw std::runtime_error{"required signature is missing"}; + signature = d.consume_string_view(); + if (signature.size() != 64) + return monitor_error( + message, MonitorResponse::BAD_SIG, "Provided s= signature must be 64 bytes"); + + if (!d.skip_until("t")) + throw std::runtime_error{"required signature timestamp is missing"}; + timestamp = std::chrono::seconds{d.consume_integer()}; + + } catch (const std::exception& ex) { + return monitor_error( + message, + MonitorResponse::BAD_ARGS, + fmt::format("Invalid arguments: invalid {}= value: {}", d.key(), ex.what())); + } + + // Make sure the sig timestamp isn't too old or too new + auto now = std::chrono::system_clock::now(); + auto ts = std::chrono::system_clock::time_point(timestamp); + if (bool too_old = ts < now - 14 * 24h; too_old || ts > now + 24h) { + return monitor_error( + message, + MonitorResponse::BAD_TS, + "Invalid t= signature timestamp: timestamp is "s + + (too_old ? "too old" : "in the future")); + } + + // If given an Ed25519 pubkey for a Session ID, derive the Session ID + if (!ed_pk.empty()) { + pubkey.resize(33); + pubkey[0] = 0x05; + if (auto rc = crypto_sign_ed25519_pk_to_curve25519( + reinterpret_cast(pubkey.data() + 1), + reinterpret_cast(ed_pk.data())); + rc != 0) + return monitor_error( + message, MonitorResponse::BAD_PUBKEY, "Invalid P= ed25519 public key"); + } else { + // No Session Ed25519, so assume the pubkey (without prefix byte) is Ed25519 + ed_pk = pubkey; + ed_pk.remove_prefix(1); + } + + std::string_view verify_key = ed_pk; + std::array subkey_pub; + if (!subkey.empty()) { + try { + subkey_pub = crypto::subkey_verify_key(ed_pk, subkey); + } catch (const std::invalid_argument& ex) { + auto m = fmt::format("Signature verification failed: {}", ex.what()); + log::warning(logcat, "{}", m); + return monitor_error(message, MonitorResponse::BAD_SIG, m); + } + verify_key = std::string_view{ + reinterpret_cast(subkey_pub.data()), subkey_pub.size()}; + } + assert(verify_key.size() == 32); + + auto pubkey_hex = oxenc::to_hex(pubkey); + + auto sig_msg = fmt::format( + "MONITOR{:s}{:d}{:d}{}", + pubkey_hex, + timestamp.count(), + want_data, + fmt::join(namespaces, ",")); + + if (0 != crypto_sign_verify_detached( + reinterpret_cast(signature.data()), + reinterpret_cast(sig_msg.data()), + sig_msg.size(), + reinterpret_cast(verify_key.data()))) { + log::debug(logcat, "monitor.messages signature verification failed"); + return monitor_error(message, MonitorResponse::BAD_SIG, "Signature verification failed"); + } + + { + std::unique_lock lock{monitoring_mutex_}; + bool found = false; + for (auto [it, end] = monitoring_.equal_range(pubkey); it != end; ++it) { + auto& mon_data = it->second; + if (mon_data.push_conn == message.conn) { + log::debug( + logcat, + "monitor.messages subscription renewed for {} monitoring namespace(s) {}", + pubkey_hex, + fmt::join(namespaces, ", ")); + mon_data.reset_expiry(); + mon_data.namespaces = std::move(namespaces); + mon_data.want_data = want_data; + found = true; + break; + } + } + if (!found) { + log::debug( + logcat, + "monitor.messages new subscription for {} monitoring namespace(s) {}", + pubkey_hex, + fmt::join(namespaces, ", ")); + monitoring_.emplace( + std::piecewise_construct, + std::forward_as_tuple(pubkey), + std::forward_as_tuple(std::move(namespaces), message.conn, want_data)); + } + } + + message.send_reply("d7:successi1ee"); +} + +static void write_metadata( + oxenc::bt_dict_producer& d, std::string_view pubkey, const message& msg) { + d.append("@", pubkey); + d.append("h", msg.hash); + d.append("n", to_int(msg.msg_namespace)); + d.append("t", to_epoch_ms(msg.timestamp)); + d.append("z", to_epoch_ms(msg.expiry)); +} + +void OMQ::send_notifies(message msg) { + auto pubkey = msg.pubkey.prefixed_raw(); + auto now = std::chrono::steady_clock::now(); + std::vector relay_to, relay_to_with_data; + { + std::shared_lock lock{monitoring_mutex_}; + for (auto [it, end] = monitoring_.equal_range(pubkey); it != end; ++it) { + const auto& mon_data = it->second; + if (mon_data.expiry >= now && + std::binary_search( + mon_data.namespaces.begin(), mon_data.namespaces.end(), msg.msg_namespace)) + (mon_data.want_data ? relay_to_with_data : relay_to).push_back(mon_data.push_conn); + } + } + + if (relay_to.empty() && relay_to_with_data.empty()) + return; + + // We output a dict with keys (in order): + // - @ pubkey + // - h msg hash + // - n msg namespace + // - t msg timestamp + // - z msg expiry + // - ~d msg data (optional) + constexpr size_t metadata_size = 2 // d...e + + 3 + 36 // 1:@ and 33:[33-byte pubkey] + + 3 + 46 // 1:h and 43:[43-byte base64 unpadded hash] + + 3 + 8 // 1:n and i-32768e + + 3 + 16 // 1:t and i1658784776010e plus a byte to grow + + 3 + 16 // 1:z and i1658784776010e plus a byte to grow + + 10; // safety margin + + std::string data; + if (!relay_to_with_data.empty()) + data.resize( + metadata_size // all the metadata above + + 3 // 1:~ + + 8 // 76800: plus a couple bytes to grow + + msg.data.size()); + else + data.resize(metadata_size); + + if (!relay_to.empty()) { + oxenc::bt_dict_producer d{data.data(), data.size()}; + write_metadata(d, pubkey, msg); + + for (const auto& conn : relay_to) + omq_.send(conn, "notify.message", data); + } + + if (!relay_to_with_data.empty()) { + oxenc::bt_dict_producer d{data.data(), data.size()}; + write_metadata(d, pubkey, msg); + d.append("~", msg.data); + + for (const auto& conn : relay_to_with_data) + omq_.send(conn, "notify.message", data); + } +} + +} // namespace oxen::server diff --git a/oxenss/snode/service_node.cpp b/oxenss/snode/service_node.cpp index 35c72fb..0a103bc 100644 --- a/oxenss/snode/service_node.cpp +++ b/oxenss/snode/service_node.cpp @@ -414,8 +414,11 @@ bool ServiceNode::process_store(message msg, bool* new_msg) { /// store in the database (if not already present) auto stored = db_->store(msg); - if (stored) + if (stored) { log::trace(logcat, *stored ? "saved message: {}" : "message already exists: {}", msg.data); + if (*stored) + omq_server_.send_notifies(std::move(msg)); + } if (new_msg) *new_msg = stored.value_or(false);