Message monitoring

This PR adds endpoints to storage server that allows someone to maintain
a oxenmq connection to one or more swarm members and receive pushed
notifications through that connection when new messages are delivered.

This is authenticated: subscribing requires a signature from the mailbox
owner signed within the past 14 days, and connections have to be
refreshed at least once/hour to keep the push notifications alive.

The immediate use for this will be for more efficient push notifications
for mobile clients using the push notification server, but this
mechanism will eventually also allow clients (over lokinet) to get
messages pushed to them rather than having to frequently poll.
This commit is contained in:
Jason Rhinelander 2022-07-25 21:14:31 -03:00
parent a682570aed
commit f402017baa
No known key found for this signature in database
GPG key ID: C4992CE7A88D4262
10 changed files with 484 additions and 29 deletions

View file

@ -3,6 +3,7 @@
#include <cstdint>
#include <string>
#include <type_traits>
#include "formattable.h"
namespace oxen {
@ -28,4 +29,6 @@ std::string to_string(namespace_id ns);
constexpr auto NAMESPACE_MIN = to_int(namespace_id::Min);
constexpr auto NAMESPACE_MAX = to_int(namespace_id::Max);
template <> inline constexpr bool to_string_formattable<namespace_id> = true;
} // namespace oxen

View file

@ -8,7 +8,11 @@
#include <oxenc/base32z.h>
#include <oxenc/base64.h>
#include <oxenc/hex.h>
#include <sodium.h>
#include <sodium/crypto_core_ed25519.h>
#include <sodium/crypto_generichash.h>
#include <sodium/crypto_scalarmult_curve25519.h>
#include <sodium/crypto_scalarmult_ed25519.h>
#include <sodium/crypto_sign_ed25519.h>
namespace oxen::crypto {
@ -96,4 +100,41 @@ x25519_pubkey parse_x25519_pubkey(std::string_view pubkey_in) {
return parse_pubkey<x25519_pubkey>(pubkey_in);
}
std::array<unsigned char, 32> subkey_verify_key(std::string_view pubkey, std::string_view subkey) {
if (pubkey.size() != 32 || subkey.size() != 32)
throw std::invalid_argument{"Invalid pubkey/subkey: both must be 32 bytes"};
return subkey_verify_key(
reinterpret_cast<const unsigned char*>(pubkey.data()),
reinterpret_cast<const unsigned char*>(subkey.data()));
}
std::array<unsigned char, 32> subkey_verify_key(
const unsigned char* pubkey, const unsigned char* subkey) {
std::array<unsigned char, 32> subkey_pub;
// Need to compute: (c + H("OxenSSSubkey" || c || A)) A and use that instead of A for verification:
// H("OxenSSSubkey" || c || A):
crypto_generichash_state h_state;
crypto_generichash_init(
&h_state,
reinterpret_cast<const unsigned char*>(SUBKEY_HASH_KEY.data()),
SUBKEY_HASH_KEY.size(),
32);
crypto_generichash_update(&h_state, subkey, 32); // c
crypto_generichash_update(&h_state, pubkey, 32); // A
crypto_generichash_final(&h_state, subkey_pub.data(), 32);
// c + H(...):
crypto_core_ed25519_scalar_add(subkey_pub.data(), subkey, subkey_pub.data());
// (c + H(...)) A:
if (0 != crypto_scalarmult_ed25519_noclamp(subkey_pub.data(), subkey_pub.data(), pubkey))
throw std::invalid_argument{"Invalid pubkey/subkey combination"};
return subkey_pub;
}
} // namespace oxen::crypto

View file

@ -14,6 +14,8 @@ namespace oxen::crypto {
using namespace std::literals;
constexpr std::string_view SUBKEY_HASH_KEY = "OxenSSSubkey"sv;
namespace detail {
template <size_t Length>
inline constexpr std::array<unsigned char, Length> null_bytes = {0};
@ -91,6 +93,12 @@ legacy_pubkey parse_legacy_pubkey(std::string_view pubkey_in);
ed25519_pubkey parse_ed25519_pubkey(std::string_view pubkey_in);
x25519_pubkey parse_x25519_pubkey(std::string_view pubkey_in);
/// Computes the signature verification derived pubkey for a pubkey+subkey string. Throws
/// std::invalid_argument if the pubkey/subkey aren't valid.
std::array<unsigned char, 32> subkey_verify_key(std::string_view pubkey, std::string_view subkey);
std::array<unsigned char, 32> subkey_verify_key(
const unsigned char* pubkey, const unsigned char* subkey);
} // namespace oxen::crypto
template <>

View file

@ -24,8 +24,6 @@ namespace oxen::rpc {
using namespace std::literals;
constexpr std::string_view SUBKEY_HASH_KEY = "OxenSSSubkey"sv;
// Client rpc endpoints, accessible via the HTTPS storage_rpc endpoint, the OMQ
// "storage.whatever" endpoints, and as the final target of an onion request.

View file

@ -25,6 +25,7 @@
#include <sodium/crypto_scalarmult_curve25519.h>
#include <sodium/crypto_scalarmult_ed25519.h>
#include <sodium/crypto_sign.h>
#include <stdexcept>
#include <type_traits>
#include <variant>
@ -274,26 +275,12 @@ namespace {
return false;
}
// Need to compute: (c + H(c || A)) A and use that instead of A for verification:
// H(c || A):
crypto_generichash_state h_state;
crypto_generichash_init(
&h_state,
reinterpret_cast<const unsigned char*>(SUBKEY_HASH_KEY.data()),
SUBKEY_HASH_KEY.size(),
32);
crypto_generichash_update(&h_state, subkey->data(), 32); // c
crypto_generichash_update(&h_state, pk, 32); // A
crypto_generichash_final(&h_state, subkey_pub.data(), 32);
// c + H(c || A):
crypto_core_ed25519_scalar_add(subkey_pub.data(), subkey->data(), subkey_pub.data());
// (c + H(c || A)) A:
if (0 != crypto_scalarmult_ed25519_noclamp(subkey_pub.data(), subkey_pub.data(), pk)) {
log::warning(
logcat, "Signature verification failed: invalid subkey multiplication");
// Compute our verification key: (c + H("OxenSSSubkey" || c || A)) A and use that
// instead of A for verification:
try {
subkey_pub = crypto::subkey_verify_key(pk, subkey->data());
} catch (const std::invalid_argument& ex) {
log::warning(logcat, "Signature verification failed: {}", ex.what());
return false;
}
pk = subkey_pub.data();

View file

@ -3,6 +3,7 @@ add_library(server STATIC
https.cpp
omq.cpp
omq_logger.cpp
omq_monitor.cpp
server_certificates.cpp)
find_package(Threads)

View file

@ -1,6 +1,7 @@
#include "omq.h"
#include <oxenss/crypto/channel_encryption.hpp>
#include <oxenmq/auth.h>
#include <oxenss/crypto/keys.h>
#include <oxenss/logging/oxen_logger.h>
#include "utils.h"
@ -16,10 +17,13 @@
#include <oxenc/base64.h>
#include <oxenc/bt_serialize.h>
#include <oxenc/hex.h>
#include <oxenc/bt_producer.h>
#include <fmt/std.h>
#include <sodium/crypto_sign.h>
#include <optional>
#include <stdexcept>
#include <type_traits>
#include <variant>
namespace oxen::server {
@ -204,10 +208,6 @@ void OMQ::handle_get_stats(oxenmq::Message& message) {
message.send_reply(payload);
}
namespace {
} // namespace
oxenc::bt_value json_to_bt(nlohmann::json j) {
if (j.is_object()) {
oxenc::bt_dict res;
@ -392,6 +392,12 @@ OMQ::OMQ(
for (const auto& [name, _cb] : rpc::RequestHandler::client_rpc_endpoints)
st_cat.add_request_command(std::string{name}, [this, name=name](auto& m) { handle_client_request(name, m); });
// monitor.* endpoints are used to subscribe to events such as new messages arriving for an
// account.
omq_.add_category("monitor", oxenmq::AuthLevel::none, 1 /*reserved threads*/, 500 /*max queue*/)
.add_request_command("messages", [this](auto& m) { handle_monitor_messages(m); })
;
// Endpoints invokable by a local admin
omq_.add_category("service", oxenmq::AuthLevel::admin)
.add_request_command("get_stats", [this](auto& m) { handle_get_stats(m); })

View file

@ -2,15 +2,18 @@
#include <cstdint>
#include <memory>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>
#include <nlohmann/json_fwd.hpp>
#include <oxenc/bt_serialize.h>
#include <oxenmq/oxenmq.h>
#include <oxenss/snode/sn_record.h>
#include "../common/message.h"
#include "../snode/sn_record.h"
namespace oxen::rpc {
class RequestHandler;
@ -25,11 +28,36 @@ class ServiceNode;
namespace oxen::server {
using namespace std::literals;
oxenc::bt_value json_to_bt(nlohmann::json j);
nlohmann::json bt_to_json(oxenc::bt_dict_consumer d);
nlohmann::json bt_to_json(oxenc::bt_list_consumer l);
struct MonitorData {
static constexpr auto MONITOR_EXPIRY_TIME = 65min;
std::chrono::steady_clock::time_point expiry; // When this notify reg expires
std::vector<namespace_id> namespaces; // sorted namespace_ids
oxenmq::ConnectionID push_conn; // ConnectionID to push notifications to
bool want_data; // true if the subscriber wants msg data
MonitorData(
std::vector<namespace_id> namespaces,
oxenmq::ConnectionID conn,
bool data,
std::chrono::seconds ttl = MONITOR_EXPIRY_TIME) :
expiry{std::chrono::steady_clock::now() + ttl},
namespaces{std::move(namespaces)},
push_conn{std::move(conn)},
want_data{data} {}
void reset_expiry(std::chrono::seconds ttl = MONITOR_EXPIRY_TIME) {
expiry = std::chrono::steady_clock::now() + ttl;
}
};
class OMQ {
oxenmq::OxenMQ omq_;
oxenmq::ConnectionID oxend_conn_;
@ -41,6 +69,10 @@ class OMQ {
rpc::RateLimiter* rate_limiter_ = nullptr;
// Tracks accounts we are monitoring for OMQ push notification messages
std::unordered_multimap<std::string, MonitorData> monitoring_;
mutable std::shared_mutex monitoring_mutex_;
// Get node's address
std::string peer_lookup(std::string_view pubkey_bin) const;
@ -83,6 +115,72 @@ class OMQ {
void handle_client_request(
std::string_view method, oxenmq::Message& message, bool forwarded = false);
/// Handles a subscription request to monitor new messages (OMQ endpoint monitor.messages). The
/// message body must be bt-encoded, and contains the following keys. Note that keys are
/// case-sensitive and, for proper bt-encoding, must be in ascii-sorted order (rather than the
/// order described here). Keys are:
/// - exactly one of:
/// - p -- the account public key, prefixed with the netid, in bytes (33 bytes). This should
/// be used for pubkeys that are ed keys (but not 05 session ids, see the next entry)
/// - P -- an ed25519 pubkey underlying a session ID, in bytes (32 bytes). The account
/// will be derived by converting to an x25519 pubkey and prepending the 0x05 byte. The
/// signature uses *this* key, not the derived x25519 key.
/// - S -- (optional) a 32-byte authentication subkey to use for authentication. The
/// signature with such a subkey uses a derived key (as described in the RPC endpoint
/// documentation).
/// - n -- list of namespace ids to monitor for new messages; the ids must be valid (i.e. -32768
/// through 32767), must be sorted in numeric order, and must contain no duplicates.
/// - d -- set to 1 if the caller wants the full message data, 0 (or omitted) will omit the data
/// from notifications.
/// - t -- signature timestamp, in integer unix seconds (*not* milliseconds), associated with
/// the signature. This timestamp must be within the last 2 weeks (and no more than 1 day in
/// the future) for this request to be valid.
/// - s -- the signature associated with this message. This is an Ed25519 signature of the
/// value:
/// ( "MONITOR" || ACCOUNT || TS || D || NS[0] || "," || ... || "," || NS[n] )
/// signed by the account Ed25519 key or derived subkey (if using subkey):
/// - ACCOUNT is the full account ID, expressed in hex (e.g. "0512345...").
/// - TS is the signature timestamp value, expressed as a base-10 string
/// - D is "0" or "1" depending on whether data is wanted (i.e. the "d" request parameter)
/// - NS[i] are the namespace values from the request expressed as base-10 strings
///
/// If the request validates then the connection is subscribed (for 65 minutes) to new incoming
/// messages in the given namespace(s). A caller should renew subscriptions periodically by
/// re-submitting the subscription request (with at most 1h between re-subscriptions).
///
/// The reply to the subscription request is a bencoded dict containing keys:
/// - success -- included on successful subscription and set to the integer 1
/// - errcode -- a numeric error value indicating the failure. Currently implemented are:
/// - 1 -- invalid arguments -- called for invalid data (e.g. wrong encoding, wrong value
/// type, or a missing required parameter)
/// - 2 -- invalid pubkey -- the given pubkey/session id is not a valid pubkey.
/// - 3 -- invalid namespace -- the namespaces provided are invalid (e.g. invalid value, not
/// sorted, or contains duplicates).
/// - 4 -- invalid timestamp -- the timestamp is not a valid integral timestamp, is too old,
/// or is in the future.
/// - 5 -- signature failed -- the signature failed to validate.
/// - 6 -- wrong swarm -- the given pubkey is not stored by this service node's swarm.
/// - error -- included whenever `errcode` is, this contains an English description of the
/// error.
///
/// Each time a message is received the service node sends a message to the connection with a
/// first part (i.e. endpoint) of "notify.message", and second part containing the bt-encoded
/// message details in a dict with keys:
///
/// - @ -- the account pubkey, in bytes (33). This is the actual account value, regardless of
/// which of `p`/`P`/`S` was used in the request. (Symbol so that it sorts very early).
/// - h -- the message hash
/// - n -- the message namespace (-32768 to 32767)
/// - t -- the message timestamp (milliseconds since unix epoch), as provided by the client who
/// deposited the message.
/// - z -- the expiry (milliseconds since unix epoch) of the message.
/// - ~d -- the message data. Note that this is only included if it was requested by specifying
/// `d` as 1 in the subscription request. (This is `~d` rather than `d` to put it at the end
/// of the dict, which makes construction here a little easier).
///
/// Note that the client should accept (and ignore) unknown keys, to allow for future expansion.
void handle_monitor_messages(oxenmq::Message& message);
void handle_get_logs(oxenmq::Message& message);
void handle_get_stats(oxenmq::Message& message);
@ -137,6 +235,9 @@ class OMQ {
// Decodes onion request data; throws if invalid formatted or missing required fields.
static std::pair<std::string_view, rpc::OnionRequestMetadata> decode_onion_data(
std::string_view data);
// Called during message submission to send notifications to anyone subscribed to them.
void send_notifies(message msg);
};
} // namespace oxen::server

View file

@ -0,0 +1,307 @@
#include "omq.h"
#include "../common/namespace.h"
#include "../rpc/client_rpc_endpoints.h"
#include "../utils/time.hpp"
#include <chrono>
#include <oxen/log.hpp>
#include <tuple>
#include <type_traits>
#include <utility>
#include <oxenc/bt_producer.h>
#include <oxenc/hex.h>
#include <sodium/crypto_sign.h>
#include <sodium/crypto_sign_ed25519.h>
namespace oxen::server {
using namespace std::literals;
static auto logcat = log::Cat("monitor");
namespace {
enum class MonitorResponse {
BAD_ARGS = 1,
BAD_PUBKEY = 2,
BAD_NS = 3,
BAD_TS = 4,
BAD_SIG = 5,
WRONG_SWARM = 6,
};
void monitor_error(oxenmq::Message& m, MonitorResponse r, std::string_view message) {
std::string buf;
buf.resize(message.size() + 30);
oxenc::bt_dict_producer d{buf.data(), buf.size()};
d.append("errcode", static_cast<std::underlying_type_t<MonitorResponse>>(r));
d.append("error", message);
buf.resize(d.view().size());
m.send_reply(buf);
}
} // namespace
void OMQ::handle_monitor_messages(oxenmq::Message& message) {
if (message.data.size() != 1)
return monitor_error(
message,
MonitorResponse::BAD_ARGS,
"Invalid arguments: monitor.messages takes a single bencoded dict parameter");
const auto& m = message.data[0];
if (m.size() < 2 || m.front() != 'd')
return monitor_error(
message,
MonitorResponse::BAD_ARGS,
"Invalid arguments: monitor.messages parameter must be a bencoded dict");
oxenc::bt_dict_consumer d{m};
// Values we receive, in bt-dict order:
std::string_view ed_pk; // P
std::string_view subkey; // S
bool want_data = false; // d
using namespace_int = std::underlying_type_t<namespace_id>;
std::vector<namespace_id> namespaces; // n
std::string pubkey; // p
std::string_view signature; // s
std::chrono::seconds timestamp; // t
try {
// Ed25519 pubkey for a Session ID (required if `p` is not present)
if (d.skip_until("P")) {
ed_pk = d.consume_string_view();
if (ed_pk.size() != 32)
return monitor_error(
message,
MonitorResponse::BAD_PUBKEY,
"Provided P= Session Ed25519 pubkey must be 32 bytes");
}
// Subkey for subkey auth (optional)
if (d.skip_until("S")) {
subkey = d.consume_string_view();
if (subkey.size() != 32)
return monitor_error(
message,
MonitorResponse::BAD_PUBKEY,
"Provided S= subkey must be 32 bytes");
}
// Send full data (optional)
if (d.skip_until("d"))
want_data = d.consume_integer<bool>();
// List of namespaces to monitor (required)
if (d.skip_until("n")) {
auto ns = d.consume_list_consumer();
namespaces.push_back(static_cast<namespace_id>(ns.consume_integer<namespace_int>()));
while (!ns.is_finished()) {
auto nsi = static_cast<namespace_id>(ns.consume_integer<namespace_int>());
if (nsi > namespaces.back())
namespaces.push_back(nsi);
else
return monitor_error(
message,
MonitorResponse::BAD_NS,
"Invalid n= namespace list: namespaces must be ascending");
}
} else {
throw std::runtime_error{"required namespace list is missing"};
}
if (d.skip_until("p")) {
if (!ed_pk.empty())
throw std::runtime_error{"Cannot provide both p= and P= pubkey values"};
pubkey = d.consume_string();
if (pubkey.size() != 33)
monitor_error(
message,
MonitorResponse::BAD_PUBKEY,
"Provided p= pubkey must be 33 bytes");
} else if (ed_pk.empty()) {
throw std::runtime_error{"Either p= or P= must be given"};
}
if (!d.skip_until("s"))
throw std::runtime_error{"required signature is missing"};
signature = d.consume_string_view();
if (signature.size() != 64)
return monitor_error(
message, MonitorResponse::BAD_SIG, "Provided s= signature must be 64 bytes");
if (!d.skip_until("t"))
throw std::runtime_error{"required signature timestamp is missing"};
timestamp = std::chrono::seconds{d.consume_integer<int64_t>()};
} catch (const std::exception& ex) {
return monitor_error(
message,
MonitorResponse::BAD_ARGS,
fmt::format("Invalid arguments: invalid {}= value: {}", d.key(), ex.what()));
}
// Make sure the sig timestamp isn't too old or too new
auto now = std::chrono::system_clock::now();
auto ts = std::chrono::system_clock::time_point(timestamp);
if (bool too_old = ts < now - 14 * 24h; too_old || ts > now + 24h) {
return monitor_error(
message,
MonitorResponse::BAD_TS,
"Invalid t= signature timestamp: timestamp is "s +
(too_old ? "too old" : "in the future"));
}
// If given an Ed25519 pubkey for a Session ID, derive the Session ID
if (!ed_pk.empty()) {
pubkey.resize(33);
pubkey[0] = 0x05;
if (auto rc = crypto_sign_ed25519_pk_to_curve25519(
reinterpret_cast<unsigned char*>(pubkey.data() + 1),
reinterpret_cast<const unsigned char*>(ed_pk.data()));
rc != 0)
return monitor_error(
message, MonitorResponse::BAD_PUBKEY, "Invalid P= ed25519 public key");
} else {
// No Session Ed25519, so assume the pubkey (without prefix byte) is Ed25519
ed_pk = pubkey;
ed_pk.remove_prefix(1);
}
std::string_view verify_key = ed_pk;
std::array<unsigned char, 32> subkey_pub;
if (!subkey.empty()) {
try {
subkey_pub = crypto::subkey_verify_key(ed_pk, subkey);
} catch (const std::invalid_argument& ex) {
auto m = fmt::format("Signature verification failed: {}", ex.what());
log::warning(logcat, "{}", m);
return monitor_error(message, MonitorResponse::BAD_SIG, m);
}
verify_key = std::string_view{
reinterpret_cast<const char*>(subkey_pub.data()), subkey_pub.size()};
}
assert(verify_key.size() == 32);
auto pubkey_hex = oxenc::to_hex(pubkey);
auto sig_msg = fmt::format(
"MONITOR{:s}{:d}{:d}{}",
pubkey_hex,
timestamp.count(),
want_data,
fmt::join(namespaces, ","));
if (0 != crypto_sign_verify_detached(
reinterpret_cast<const unsigned char*>(signature.data()),
reinterpret_cast<const unsigned char*>(sig_msg.data()),
sig_msg.size(),
reinterpret_cast<const unsigned char*>(verify_key.data()))) {
log::debug(logcat, "monitor.messages signature verification failed");
return monitor_error(message, MonitorResponse::BAD_SIG, "Signature verification failed");
}
{
std::unique_lock lock{monitoring_mutex_};
bool found = false;
for (auto [it, end] = monitoring_.equal_range(pubkey); it != end; ++it) {
auto& mon_data = it->second;
if (mon_data.push_conn == message.conn) {
log::debug(
logcat,
"monitor.messages subscription renewed for {} monitoring namespace(s) {}",
pubkey_hex,
fmt::join(namespaces, ", "));
mon_data.reset_expiry();
mon_data.namespaces = std::move(namespaces);
mon_data.want_data = want_data;
found = true;
break;
}
}
if (!found) {
log::debug(
logcat,
"monitor.messages new subscription for {} monitoring namespace(s) {}",
pubkey_hex,
fmt::join(namespaces, ", "));
monitoring_.emplace(
std::piecewise_construct,
std::forward_as_tuple(pubkey),
std::forward_as_tuple(std::move(namespaces), message.conn, want_data));
}
}
message.send_reply("d7:successi1ee");
}
static void write_metadata(
oxenc::bt_dict_producer& d, std::string_view pubkey, const message& msg) {
d.append("@", pubkey);
d.append("h", msg.hash);
d.append("n", to_int(msg.msg_namespace));
d.append("t", to_epoch_ms(msg.timestamp));
d.append("z", to_epoch_ms(msg.expiry));
}
void OMQ::send_notifies(message msg) {
auto pubkey = msg.pubkey.prefixed_raw();
auto now = std::chrono::steady_clock::now();
std::vector<oxenmq::ConnectionID> relay_to, relay_to_with_data;
{
std::shared_lock lock{monitoring_mutex_};
for (auto [it, end] = monitoring_.equal_range(pubkey); it != end; ++it) {
const auto& mon_data = it->second;
if (mon_data.expiry >= now &&
std::binary_search(
mon_data.namespaces.begin(), mon_data.namespaces.end(), msg.msg_namespace))
(mon_data.want_data ? relay_to_with_data : relay_to).push_back(mon_data.push_conn);
}
}
if (relay_to.empty() && relay_to_with_data.empty())
return;
// We output a dict with keys (in order):
// - @ pubkey
// - h msg hash
// - n msg namespace
// - t msg timestamp
// - z msg expiry
// - ~d msg data (optional)
constexpr size_t metadata_size = 2 // d...e
+ 3 + 36 // 1:@ and 33:[33-byte pubkey]
+ 3 + 46 // 1:h and 43:[43-byte base64 unpadded hash]
+ 3 + 8 // 1:n and i-32768e
+ 3 + 16 // 1:t and i1658784776010e plus a byte to grow
+ 3 + 16 // 1:z and i1658784776010e plus a byte to grow
+ 10; // safety margin
std::string data;
if (!relay_to_with_data.empty())
data.resize(
metadata_size // all the metadata above
+ 3 // 1:~
+ 8 // 76800: plus a couple bytes to grow
+ msg.data.size());
else
data.resize(metadata_size);
if (!relay_to.empty()) {
oxenc::bt_dict_producer d{data.data(), data.size()};
write_metadata(d, pubkey, msg);
for (const auto& conn : relay_to)
omq_.send(conn, "notify.message", data);
}
if (!relay_to_with_data.empty()) {
oxenc::bt_dict_producer d{data.data(), data.size()};
write_metadata(d, pubkey, msg);
d.append("~", msg.data);
for (const auto& conn : relay_to_with_data)
omq_.send(conn, "notify.message", data);
}
}
} // namespace oxen::server

View file

@ -414,8 +414,11 @@ bool ServiceNode::process_store(message msg, bool* new_msg) {
/// store in the database (if not already present)
auto stored = db_->store(msg);
if (stored)
if (stored) {
log::trace(logcat, *stored ? "saved message: {}" : "message already exists: {}", msg.data);
if (*stored)
omq_server_.send_notifies(std::move(msg));
}
if (new_msg)
*new_msg = stored.value_or(false);