diff --git a/httpserver/http_connection.cpp b/httpserver/http_connection.cpp index 6b43ed3..67f18af 100644 --- a/httpserver/http_connection.cpp +++ b/httpserver/http_connection.cpp @@ -26,7 +26,6 @@ using json = nlohmann::json; using tcp = boost::asio::ip::tcp; // from namespace http = boost::beast::http; // from -namespace pt = boost::property_tree; // from using namespace service_node; /// +=========================================== @@ -229,8 +228,7 @@ void connection_t::process_request() { response_.result(http::status::ok); } else if (target == "/retrieve_all") { - bodyStream_ << service_node_.get_all_messages(boost::none); - response_.result(http::status::ok); + process_retrieve_all(); } else if (target == "/v1/swarms/push_all") { response_.result(http::status::ok); @@ -413,7 +411,20 @@ void connection_t::process_snodes_by_pk(const json& params) { return; } - const auto pubKey = params["pubKey"].get(); + auto pubKey = params["pubKey"].get(); + + if (pubKey.size() != 64) { + + if (pubKey.size() == 66) { + /// Note signal prepends `05` to each pubkey + pubKey = pubKey.substr(2, std::string::npos); + } else { + response_.result(http::status::bad_request); + bodyStream_ << "invalid json: pubKey should be 64/66 characters long"; + BOOST_LOG_TRIVIAL(error) << "Bad client request: no `pubKey` field"; + return; + } + } std::vector nodes = service_node_.get_snodes_by_pk(pubKey); @@ -439,6 +450,32 @@ void connection_t::process_snodes_by_pk(const json& params) { } +void connection_t::process_retrieve_all() { + + std::vector all_entries; + + bool res = service_node_.get_all_messages(all_entries); + + if (!res) { + response_.result(http::status::internal_server_error); + return; + } + + json messages = json::array(); + + for (auto& entry : all_entries) { + json item; + item["data"] = entry.bytes; + messages.push_back(item); + } + + json res_body; + res_body["messages"] = messages; + + bodyStream_ << res_body.dump(); + response_.result(http::status::ok); +} + void connection_t::process_retrieve(const json& params) { constexpr const char* fields[] = {"pubKey", "lastHash"}; @@ -467,28 +504,28 @@ void connection_t::process_retrieve(const json& params) { return; } - pt::ptree root; - pt::ptree messagesNode; + json res_body; + json messages = json::array(); for (const auto& item : items) { - pt::ptree messageNode; - messageNode.put("hash", item.hash); - messageNode.put("expiration", item.expirationTimestamp); - messageNode.put("data", item.bytes); - messagesNode.push_back(std::make_pair("", messageNode)); + json message; + message["hash"] = item.hash; + message["expiration"] = item.expirationTimestamp; + message["data"] = item.bytes; + messages.push_back(message); } - if (messagesNode.size() != 0) { - root.add_child("messages", messagesNode); - root.put("lastHash", items.back().hash); + + res_body["messages"] = messages; + + if (!items.empty()) { BOOST_LOG_TRIVIAL(trace) << "Successfully retrieved messages for " << pubKey.substr(0, 2) << "..." << pubKey.substr(pubKey.length() - 3, pubKey.length() - 1); } - std::ostringstream buf; - pt::write_json(buf, root); + response_.result(http::status::ok); response_.set(http::field::content_type, "application/json"); - bodyStream_ << buf.str(); + bodyStream_ << res_body.dump(); } void connection_t::process_client_req() { diff --git a/httpserver/http_connection.h b/httpserver/http_connection.h index cc1746a..8c33b8c 100644 --- a/httpserver/http_connection.h +++ b/httpserver/http_connection.h @@ -8,8 +8,7 @@ #include #include #include -#include -#include +#include #include "../external/json.hpp" template @@ -20,14 +19,11 @@ class ServiceNode; } namespace http = boost::beast::http; // from -namespace pt = boost::property_tree; // from using request_t = http::request; using http_callback_t = std::function)>; -using rpc_function = std::function; - namespace loki { void make_http_request(boost::asio::io_context& ioc, std::string ip, @@ -91,7 +87,6 @@ class connection_t : public std::enable_shared_from_this { /// TODO: move these if possible std::map header_; - std::map rpc_endpoints_; // The timer for putting a deadline on connection processing. boost::asio::basic_waitable_timer deadline_; @@ -126,6 +121,8 @@ class connection_t : public std::enable_shared_from_this { void process_snodes_by_pk(const nlohmann::json& params); + void process_retrieve_all(); + /// Asynchronously transmit the response message. void write_response(); diff --git a/httpserver/service_node.cpp b/httpserver/service_node.cpp index 28d0e22..f088915 100644 --- a/httpserver/service_node.cpp +++ b/httpserver/service_node.cpp @@ -12,6 +12,7 @@ #include #include +#include #include @@ -159,7 +160,7 @@ bool ServiceNode::process_store(const message_ptr msg) { return true; } -bool ServiceNode::process_push(const message_ptr msg) { save_if_new(msg); } +void ServiceNode::process_push(const message_ptr msg) { save_if_new(msg); } void ServiceNode::save_if_new(const message_ptr msg) { @@ -353,33 +354,11 @@ bool ServiceNode::retrieve(const std::string& pubKey, return db_->retrieve(pubKey, items, last_hash); } -std::string ServiceNode::get_all_messages(boost::optional pk) { +bool ServiceNode::get_all_messages(std::vector& all_entries) { BOOST_LOG_TRIVIAL(trace) << "get all messages"; - pt::ptree messages; - - std::vector all_entries; - - bool res = db_->retrieve(*pk, all_entries, ""); - - for (auto& entry : all_entries) { - pt::ptree msg_node; - msg_node.put("data", entry.bytes); - messages.push_back(std::make_pair("", msg_node)); - } - - pt::ptree root; - - if (!res || messages.empty()) - return ""; - - root.add_child("messages", messages); - - std::ostringstream buf; - pt::write_json(buf, root); - - return buf.str(); + return db_->retrieve("", all_entries, ""); } std::string ServiceNode::serialize_all() const { diff --git a/httpserver/service_node.h b/httpserver/service_node.h index e1d5cac..86f3029 100644 --- a/httpserver/service_node.h +++ b/httpserver/service_node.h @@ -99,7 +99,7 @@ class ServiceNode { bool process_store(const message_ptr msg); /// Process message relayed from another SN from our swarm - bool process_push(const message_ptr msg); + void process_push(const message_ptr msg); /// Process incoming blob of messages: add to DB if new void process_push_all(std::shared_ptr blob); @@ -110,7 +110,7 @@ class ServiceNode { void purge_outdated(); /// return all messages for a particular PK (in JSON) - std::string get_all_messages(boost::optional pk); + bool get_all_messages(std::vector& all_entries); bool retrieve(const std::string& pubKey, const std::string& last_hash, std::vector& items); diff --git a/httpserver/swarm.cpp b/httpserver/swarm.cpp index 5c4ca74..939d2db 100644 --- a/httpserver/swarm.cpp +++ b/httpserver/swarm.cpp @@ -119,9 +119,7 @@ SwarmEvents Swarm::update_swarms(const all_swarms_t& swarms) { } swarm_id_t get_swarm_by_pk(const std::vector& all_swarms, - std::string pk) { - - assert(pk.size() == 64); + const std::string& pk) { // TODO: handle errors // TODO: get rid of allocations? diff --git a/httpserver/swarm.h b/httpserver/swarm.h index 2000564..912c43a 100644 --- a/httpserver/swarm.h +++ b/httpserver/swarm.h @@ -24,7 +24,7 @@ struct SwarmInfo { using all_swarms_t = std::vector; swarm_id_t get_swarm_by_pk(const std::vector& all_swarms, - std::string pk); + const std::string& pk); struct SwarmEvents {