purge ptree and more robust client request handling

This commit is contained in:
Maxim Shishmarev 2019-03-22 16:47:18 +11:00
parent 7207e6a3c6
commit 55fc3c314f
6 changed files with 65 additions and 54 deletions

View file

@ -26,7 +26,6 @@ using json = nlohmann::json;
using tcp = boost::asio::ip::tcp; // from <boost/asio.hpp>
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
namespace pt = boost::property_tree; // from <boost/property_tree/>
using namespace service_node;
/// +===========================================
@ -229,8 +228,7 @@ void connection_t::process_request() {
response_.result(http::status::ok);
} else if (target == "/retrieve_all") {
bodyStream_ << service_node_.get_all_messages(boost::none);
response_.result(http::status::ok);
process_retrieve_all();
} else if (target == "/v1/swarms/push_all") {
response_.result(http::status::ok);
@ -413,7 +411,20 @@ void connection_t::process_snodes_by_pk(const json& params) {
return;
}
const auto pubKey = params["pubKey"].get<std::string>();
auto pubKey = params["pubKey"].get<std::string>();
if (pubKey.size() != 64) {
if (pubKey.size() == 66) {
/// Note signal prepends `05` to each pubkey
pubKey = pubKey.substr(2, std::string::npos);
} else {
response_.result(http::status::bad_request);
bodyStream_ << "invalid json: pubKey should be 64/66 characters long";
BOOST_LOG_TRIVIAL(error) << "Bad client request: no `pubKey` field";
return;
}
}
std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pubKey);
@ -439,6 +450,32 @@ void connection_t::process_snodes_by_pk(const json& params) {
}
void connection_t::process_retrieve_all() {
std::vector<Item> all_entries;
bool res = service_node_.get_all_messages(all_entries);
if (!res) {
response_.result(http::status::internal_server_error);
return;
}
json messages = json::array();
for (auto& entry : all_entries) {
json item;
item["data"] = entry.bytes;
messages.push_back(item);
}
json res_body;
res_body["messages"] = messages;
bodyStream_ << res_body.dump();
response_.result(http::status::ok);
}
void connection_t::process_retrieve(const json& params) {
constexpr const char* fields[] = {"pubKey", "lastHash"};
@ -467,28 +504,28 @@ void connection_t::process_retrieve(const json& params) {
return;
}
pt::ptree root;
pt::ptree messagesNode;
json res_body;
json messages = json::array();
for (const auto& item : items) {
pt::ptree messageNode;
messageNode.put("hash", item.hash);
messageNode.put("expiration", item.expirationTimestamp);
messageNode.put("data", item.bytes);
messagesNode.push_back(std::make_pair("", messageNode));
json message;
message["hash"] = item.hash;
message["expiration"] = item.expirationTimestamp;
message["data"] = item.bytes;
messages.push_back(message);
}
if (messagesNode.size() != 0) {
root.add_child("messages", messagesNode);
root.put("lastHash", items.back().hash);
res_body["messages"] = messages;
if (!items.empty()) {
BOOST_LOG_TRIVIAL(trace)
<< "Successfully retrieved messages for " << pubKey.substr(0, 2)
<< "..." << pubKey.substr(pubKey.length() - 3, pubKey.length() - 1);
}
std::ostringstream buf;
pt::write_json(buf, root);
response_.result(http::status::ok);
response_.set(http::field::content_type, "application/json");
bodyStream_ << buf.str();
bodyStream_ << res_body.dump();
}
void connection_t::process_client_req() {

View file

@ -8,8 +8,7 @@
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/format.hpp>
#include "../external/json.hpp"
template <typename T>
@ -20,14 +19,11 @@ class ServiceNode;
}
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
namespace pt = boost::property_tree; // from <boost/property_tree/>
using request_t = http::request<http::string_body>;
using http_callback_t = std::function<void(std::shared_ptr<std::string>)>;
using rpc_function = std::function<void(const pt::ptree&)>;
namespace loki {
void make_http_request(boost::asio::io_context& ioc, std::string ip,
@ -91,7 +87,6 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
/// TODO: move these if possible
std::map<std::string, std::string> header_;
std::map<std::string, rpc_function> rpc_endpoints_;
// The timer for putting a deadline on connection processing.
boost::asio::basic_waitable_timer<std::chrono::steady_clock> deadline_;
@ -126,6 +121,8 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
void process_snodes_by_pk(const nlohmann::json& params);
void process_retrieve_all();
/// Asynchronously transmit the response message.
void write_response();

View file

@ -12,6 +12,7 @@
#include <fstream>
#include <boost/algorithm/string.hpp>
#include <boost/bind.hpp>
#include <boost/log/trivial.hpp>
@ -159,7 +160,7 @@ bool ServiceNode::process_store(const message_ptr msg) {
return true;
}
bool ServiceNode::process_push(const message_ptr msg) { save_if_new(msg); }
void ServiceNode::process_push(const message_ptr msg) { save_if_new(msg); }
void ServiceNode::save_if_new(const message_ptr msg) {
@ -353,33 +354,11 @@ bool ServiceNode::retrieve(const std::string& pubKey,
return db_->retrieve(pubKey, items, last_hash);
}
std::string ServiceNode::get_all_messages(boost::optional<const std::string&> pk) {
bool ServiceNode::get_all_messages(std::vector<Item>& all_entries) {
BOOST_LOG_TRIVIAL(trace) << "get all messages";
pt::ptree messages;
std::vector<Item> all_entries;
bool res = db_->retrieve(*pk, all_entries, "");
for (auto& entry : all_entries) {
pt::ptree msg_node;
msg_node.put("data", entry.bytes);
messages.push_back(std::make_pair("", msg_node));
}
pt::ptree root;
if (!res || messages.empty())
return "";
root.add_child("messages", messages);
std::ostringstream buf;
pt::write_json(buf, root);
return buf.str();
return db_->retrieve("", all_entries, "");
}
std::string ServiceNode::serialize_all() const {

View file

@ -99,7 +99,7 @@ class ServiceNode {
bool process_store(const message_ptr msg);
/// Process message relayed from another SN from our swarm
bool process_push(const message_ptr msg);
void process_push(const message_ptr msg);
/// Process incoming blob of messages: add to DB if new
void process_push_all(std::shared_ptr<std::string> blob);
@ -110,7 +110,7 @@ class ServiceNode {
void purge_outdated();
/// return all messages for a particular PK (in JSON)
std::string get_all_messages(boost::optional<const std::string&> pk);
bool get_all_messages(std::vector<service_node::storage::Item>& all_entries);
bool retrieve(const std::string& pubKey, const std::string& last_hash,
std::vector<service_node::storage::Item>& items);

View file

@ -119,9 +119,7 @@ SwarmEvents Swarm::update_swarms(const all_swarms_t& swarms) {
}
swarm_id_t get_swarm_by_pk(const std::vector<SwarmInfo>& all_swarms,
std::string pk) {
assert(pk.size() == 64);
const std::string& pk) {
// TODO: handle errors
// TODO: get rid of allocations?

View file

@ -24,7 +24,7 @@ struct SwarmInfo {
using all_swarms_t = std::vector<SwarmInfo>;
swarm_id_t get_swarm_by_pk(const std::vector<SwarmInfo>& all_swarms,
std::string pk);
const std::string& pk);
struct SwarmEvents {