Merge pull request #3 from msgmaxim/merged

integration tests are working for dynamic swarms
This commit is contained in:
Beaudan Campbell-Brown 2019-03-22 14:08:40 +11:00 committed by GitHub
commit 7207e6a3c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 21078 additions and 76 deletions

20842
external/json.hpp vendored Normal file

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,7 @@ add_definitions(-DBOOST_LOG_DYN_LINK)
project(httpserver)
set(HEADER_FILES http_connection.h swarm.h service_node.h serialization.h)
set(HEADER_FILES http_connection.h swarm.h service_node.h serialization.h ../external/json.hpp)
set(SRC_FILES main.cpp http_connection.cpp swarm.cpp service_node.cpp serialization.cpp)
add_executable(httpserver ${HEADER_FILES} ${SRC_FILES})

View file

@ -1,6 +1,38 @@
#pragma once
/// TODO: this should be replaced with sn pub key in the future
using sn_record_t = std::string; // Snode address
struct sn_record_t {
uint16_t port;
std::string address; // Snode address
};
static std::string to_string(const sn_record_t& sn) {
std::string res;
#ifdef INTEGRATION_TEST
res += std::to_string(sn.port);
#else
res += sn.address;
#endif
return res;
}
static std::ostream& operator&&(std::ostream& os, const sn_record_t& sn) {
#ifdef INTEGRATION_TEST
return os << sn.port;
#else
return os << sn.address;
#endif
}
static bool operator==(const sn_record_t& lhs, const sn_record_t& rhs) {
#ifdef INTEGRATION_TEST
return lhs.port == rhs.port;
#else
return lhs.address == rhs.address;
#endif
}
static bool operator!=(const sn_record_t& lhs, const sn_record_t& rhs) {
return !operator==(lhs, rhs);
}
using swarm_id_t = uint64_t;

View file

@ -22,6 +22,8 @@
#include "serialization.h"
using json = nlohmann::json;
using tcp = boost::asio::ip::tcp; // from <boost/asio.hpp>
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
namespace pt = boost::property_tree; // from <boost/property_tree/>
@ -211,7 +213,6 @@ void connection_t::process_request() {
BOOST_LOG_TRIVIAL(trace) << "swarms/push";
/// NOTE:: we only expect one message here, but
/// for now lets reuse the function we already have
std::vector<message_t> messages = deserialize_messages(request_.body());
@ -323,12 +324,24 @@ bool connection_t::parse_header(T key_list) {
return true;
}
void connection_t::process_store(const pt::ptree& params) {
const auto pubKey = params.get<std::string>("pubKey");
const auto ttl = params.get<std::string>("ttl");
const auto nonce = params.get<std::string>("nonce");
const auto timestamp = params.get<std::string>("timestamp");
const auto data = params.get<std::string>("data");
void connection_t::process_store(const json& params) {
constexpr const char* fields[] = {"pubKey", "ttl", "nonce", "timestamp", "data"};
for (const auto& field : fields) {
if (!params.contains(field)) {
response_.result(http::status::bad_request);
bodyStream_ << boost::format("invalid json: no `%1%` field") % field;
BOOST_LOG_TRIVIAL(error) << boost::format("Bad client request: no `%1%` field") % field;
return;
}
}
const auto pubKey = params["pubKey"].get<std::string>();
const auto ttl = params["ttl"].get<std::string>();
const auto nonce = params["nonce"].get<std::string>();
const auto timestamp = params["timestamp"].get<std::string>();
const auto data = params["data"].get<std::string>();
BOOST_LOG_TRIVIAL(trace) << "store body: " << data;
@ -391,9 +404,56 @@ void connection_t::process_store(const pt::ptree& params) {
<< pubKey.substr(pubKey.length() - 3, pubKey.length() - 1);
}
void connection_t::process_retrieve(const pt::ptree& params) {
const auto pubKey = params.get<std::string>("pubKey");
const auto last_hash = params.get("lastHash", "");
void connection_t::process_snodes_by_pk(const json& params) {
if (!params.contains("pubKey")) {
response_.result(http::status::bad_request);
bodyStream_ << "invalid json: no `pubKey` field";
BOOST_LOG_TRIVIAL(error) << "Bad client request: no `pubKey` field";
return;
}
const auto pubKey = params["pubKey"].get<std::string>();
std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pubKey);
json res_body;
json snodes = json::array();
for (const auto& sn : nodes) {
#ifdef INTEGRATION_TEST
snodes.push_back(std::to_string(sn.port));
#else
snodes.push_back(sn.address);
#endif
}
res_body["snodes"] = snodes;
response_.result(http::status::ok);
response_.set(http::field::content_type, "application/json");
/// This might throw if not utf-8 endoded
bodyStream_ << res_body.dump();
}
void connection_t::process_retrieve(const json& params) {
constexpr const char* fields[] = {"pubKey", "lastHash"};
for (const auto& field : fields) {
if (!params.contains(field)) {
response_.result(http::status::bad_request);
bodyStream_ << boost::format("invalid json: no `%1%` field") % field;
BOOST_LOG_TRIVIAL(error) << boost::format("Bad client request: no `%1%` field") % field;
return;
}
}
const auto pubKey = params["pubKey"].get<std::string>();
const auto last_hash = params["lastHash"].get<std::string>();
std::vector<Item> items;
@ -452,28 +512,46 @@ void connection_t::process_client_req() {
bodyStream_ << "Could not decode/decrypt body: ";
bodyStream_ << e.what();
BOOST_LOG_TRIVIAL(error) << "Bad Request. Could not decrypt body";
return;
}
#endif
// parse json
pt::ptree root;
std::stringstream ss;
ss << plainText;
json body = json::parse(plainText, nullptr, false);
if (body == nlohmann::detail::value_t::discarded) {
response_.result(http::status::bad_request);
bodyStream_ << "invalid json";
BOOST_LOG_TRIVIAL(error) << "Bad client request: invalid json";
return;
}
/// TODO: this may throw, need to handle
pt::json_parser::read_json(ss, root);
const auto method_it = body.find("method");
if (method_it == body.end() || !method_it->is_string()) {
response_.result(http::status::bad_request);
bodyStream_ << "invalid json: no `method` field";
BOOST_LOG_TRIVIAL(error) << "Bad client request: no method field";
return;
}
const auto method_name = root.get("method", "");
const auto method_name = method_it->get<std::string>();
const auto params_it = body.find("params");
if (params_it == body.end() || !params_it->is_object()) {
response_.result(http::status::bad_request);
bodyStream_ << "invalid json: no `params` field";
BOOST_LOG_TRIVIAL(error) << "Bad client request: no params field";
return;
}
if (method_name == "store") {
process_store(root.get_child("params"));
process_store(*params_it);
} else if (method_name == "retrieve") {
process_retrieve(root.get_child("params"));
process_retrieve(*params_it);
} else if (method_name == "get_snodes_for_pubkey") {
process_snodes_by_pk(*params_it);
} else {
response_.result(http::status::bad_request);
bodyStream_ << "no method" << method_name;
BOOST_LOG_TRIVIAL(error)
<< "Bad Request. Unknown method '" << method_name << "'";
BOOST_LOG_TRIVIAL(error) << boost::format("Bad Request. Unknown method '%1%'") % method_name;
}
}

View file

@ -10,6 +10,7 @@
#include <boost/beast/version.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include "../external/json.hpp"
template <typename T>
class ChannelEncryption;
@ -119,9 +120,11 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
/// (synchronously).
void process_request();
void process_store(const pt::ptree& params);
void process_store(const nlohmann::json& params);
void process_retrieve(const pt::ptree& params);
void process_retrieve(const nlohmann::json& params);
void process_snodes_by_pk(const nlohmann::json& params);
/// Asynchronously transmit the response message.
void write_response();

View file

@ -109,7 +109,7 @@ int main(int argc, char* argv[]) {
boost::asio::io_context ioc{1};
loki::ServiceNode service_node(ioc, lokinetIdentityPath, dbLocation);
loki::ServiceNode service_node(ioc, port, lokinetIdentityPath, dbLocation);
ChannelEncryption<std::string> channelEncryption(lokinetIdentityPath);
/// Should run http server

View file

@ -100,6 +100,8 @@ std::string serialize_message(const message_t& msg) {
res += serialize_uint64(msg.timestamp_);
serialize(res, msg.nonce_);
BOOST_LOG_TRIVIAL(debug) << "serialized message: " << msg.text_ << std::endl;
return res;
}
@ -115,6 +117,8 @@ std::string serialize_message(const Item& item) {
res += serialize_uint64(item.timestamp);
serialize(res, item.nonce);
BOOST_LOG_TRIVIAL(debug) << "serialized message: " << item.bytes << std::endl;
return res;
}
@ -148,8 +152,6 @@ boost::optional<std::string> deserialize_string(string_view& slice) {
uint32_t len = deserialize_uint32(slice.it); // already increments `it`!
std::cerr << "len: " << len << std::endl;
if (slice.size() < len) return boost::none;
std::string res = std::string(slice.it, slice.it + len);
@ -208,7 +210,7 @@ std::vector<message_t> deserialize_messages(const std::string& blob) {
/// Deserialize TTL
auto ttl = deserialize_uint64(slice);
if (!ttl) {
BOOST_LOG_TRIVIAL(error) << "could not deserialize timestamp";
BOOST_LOG_TRIVIAL(error) << "could not deserialize ttl";
return {};
}
@ -222,15 +224,17 @@ std::vector<message_t> deserialize_messages(const std::string& blob) {
/// Deserialize Nonce
auto nonce = deserialize_string(slice);
if (!nonce) {
BOOST_LOG_TRIVIAL(error) << "could not deserialize data";
BOOST_LOG_TRIVIAL(error) << "could not deserialize nonce";
return {};
}
BOOST_LOG_TRIVIAL(trace) << "deserialized data: " << *data << std::endl;
BOOST_LOG_TRIVIAL(trace)
<< boost::format("pk: %2%, msg: %3%") % *pk % *data;
<< boost::format("pk: %1%, msg: %2%") % *pk % *data;
// TODO: Actually use the message values here
result.push_back({pk->c_str(), data->c_str(), hash->c_str(), 0, 0, ""});
result.push_back({pk->c_str(), data->c_str(), hash->c_str(), *ttl, *timestamp, nonce->c_str()});
}
BOOST_LOG_TRIVIAL(trace) << "=== END ===";

View file

@ -65,11 +65,13 @@ std::string hash_data(std::string data) {
return std::string(ss.str());
}
ServiceNode::ServiceNode(boost::asio::io_context& ioc, const std::string& identityPath,
ServiceNode::ServiceNode(boost::asio::io_context& ioc, uint16_t port, const std::string& identityPath,
const std::string& dbLocation)
: ioc_(ioc), db_(std::make_unique<Database>(dbLocation)),
: ioc_(ioc), db_(std::make_unique<Database>(dbLocation)), our_port_(port),
update_timer_(ioc, std::chrono::milliseconds(100)) {
#ifndef INTEGRATION_TEST
const std::vector<uint8_t> publicKey =
parseLokinetIdentityPublic(identityPath);
char buf[64] = {0};
@ -78,18 +80,22 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc, const std::string& identi
our_address.append(dest);
our_address.append(".snode");
}
our_address_ = our_address;
our_address_.address = our_address;
#else
our_address_.port = port;
#endif
update_timer_.async_wait(std::bind(&ServiceNode::update_swarms, this));
}
ServiceNode::~ServiceNode() = default;
/// make this async
void ServiceNode::relay_one(const message_ptr msg, sn_record_t address) const {
void ServiceNode::relay_one(const message_ptr msg, sn_record_t sn) const {
/// TODO: need to encrypt messages?
BOOST_LOG_TRIVIAL(debug) << "Relaying a message to " << address;
BOOST_LOG_TRIVIAL(debug) << "Relaying a message to " << to_string(sn);
request_t req;
req.body() = serialize_message(*msg);
@ -97,21 +103,21 @@ void ServiceNode::relay_one(const message_ptr msg, sn_record_t address) const {
req.target("/v1/swarms/push");
/// TODO: how to handle a failure here?
make_http_request(ioc_, address, SNODE_PORT, req,
make_http_request(ioc_, sn.address, sn.port, req,
[](std::shared_ptr<std::string>) {
});
}
void ServiceNode::relay_batch(const std::string& data, sn_record_t address) const {
void ServiceNode::relay_batch(const std::string& data, sn_record_t sn) const {
BOOST_LOG_TRIVIAL(debug) << "Relaying a batch to " << address;
BOOST_LOG_TRIVIAL(debug) << "Relaying a batch to " << to_string(sn);
request_t req;
req.body() = data;
req.target("/v1/swarms/push_all");
make_http_request(ioc_, address, SNODE_PORT, req,
make_http_request(ioc_, sn.address, sn.port, req,
[](std::shared_ptr<std::string>) {
});
@ -136,7 +142,8 @@ void ServiceNode::push_message(const message_ptr msg) {
/// do this asyncronously on a different thread? (on the same thread?)
bool ServiceNode::process_store(const message_ptr msg) {
// TODO: Enable swarm and push_message functionality again
/// TODO: accept messages if they are coming from other service nodes
/// only accept a message if we are in a swarm
if (!swarm_) {
std::cerr << "error: my swarm in not initialized" << std::endl;
@ -159,10 +166,6 @@ void ServiceNode::save_if_new(const message_ptr msg) {
db_->store(msg->hash_, msg->pk_, msg->text_, msg->ttl_, msg->timestamp_, msg->nonce_);
BOOST_LOG_TRIVIAL(debug) << "saving message: " << msg->text_;
/// just append this to a file for simplicity
std::ofstream file("db.txt", std::ios_base::app);
file << msg->pk_ << " " << msg->text_ << "\n";
}
void ServiceNode::on_swarm_update(std::shared_ptr<std::string> body) {
@ -200,7 +203,16 @@ void ServiceNode::on_swarm_update(std::shared_ptr<std::string> body) {
uint64_t swarm_id = stoull(nodes[0]);
for (auto i = 1u; i < nodes.size(); ++i) {
swarm_members.push_back(nodes[i]);
#ifdef INTEGRATION_TEST
/// TODO: error handling here
uint16_t port = stoi(nodes[i]);
std::string address = "0.0.0.0";
#else
uint16_t port = SNODE_PORT;
std::string address = nodes[i];
#endif
swarm_members.push_back({port, address});
}
SwarmInfo si;
@ -212,6 +224,7 @@ void ServiceNode::on_swarm_update(std::shared_ptr<std::string> body) {
}
if (!swarm_) {
BOOST_LOG_TRIVIAL(trace) << "initialized our swarm" << std::endl;
swarm_ = std::make_unique<Swarm>(our_address_);
}
@ -342,6 +355,8 @@ bool ServiceNode::retrieve(const std::string& pubKey,
std::string ServiceNode::get_all_messages(boost::optional<const std::string&> pk) {
BOOST_LOG_TRIVIAL(trace) << "get all messages";
pt::ptree messages;
std::vector<Item> all_entries;
@ -460,4 +475,24 @@ void ServiceNode::process_push_all(std::shared_ptr<std::string> blob) {
}
}
std::vector<sn_record_t> ServiceNode::get_snodes_by_pk(const std::string& pk) {
const auto& all_swarms = swarm_->all_swarms();
swarm_id_t swarm_id = get_swarm_by_pk(all_swarms, pk);
// TODO: have get_swarm_by_pk return idx into all_swarms instead,
// so we don't have to find it again
for (const auto& si : all_swarms) {
if (si.swarm_id == swarm_id) return si.snodes;
}
BOOST_LOG_TRIVIAL(fatal) << "Something went wrong in get_snodes_by_pk";
return {};
}
} // namespace loki

View file

@ -56,6 +56,8 @@ class ServiceNode {
std::unique_ptr<Swarm> swarm_;
std::unique_ptr<Database> db_;
uint16_t our_port_;
sn_record_t our_address_;
boost::asio::steady_timer update_timer_;
@ -88,7 +90,7 @@ class ServiceNode {
public:
ServiceNode(boost::asio::io_context& ioc, const std::string& identityPath,
ServiceNode(boost::asio::io_context& ioc, uint16_t port, const std::string& identityPath,
const std::string& dbLocation);
~ServiceNode();
@ -102,6 +104,8 @@ class ServiceNode {
/// Process incoming blob of messages: add to DB if new
void process_push_all(std::shared_ptr<std::string> blob);
std::vector<sn_record_t> get_snodes_by_pk(const std::string& pk);
/// remove all data that doesn't belong to this swarm
void purge_outdated();

View file

@ -71,35 +71,39 @@ SwarmEvents Swarm::update_swarms(const all_swarms_t& swarms) {
}
}
/// See if anyone joined our swarm
for (auto& sn : our_swarm) {
/// don't bother checking the rest
if (!events.decommissioned) {
auto it = std::find(swarm_peers_.begin(), swarm_peers_.end(), sn);
/// See if anyone joined our swarm
for (auto& sn : our_swarm) {
if (it == swarm_peers_.end()) {
BOOST_LOG_TRIVIAL(info) << "EVENT: detected new SN: " << sn;
events.new_snodes.push_back(sn);
}
}
auto it = std::find(swarm_peers_.begin(), swarm_peers_.end(), sn);
/// See if there are any new swarms
for (const auto& swarm_info : swarms) {
bool found = false;
for (const auto& prev_si : all_cur_swarms_) {
if (prev_si.swarm_id == swarm_info.swarm_id) {
found = true;
break;
if (it == swarm_peers_.end()) {
BOOST_LOG_TRIVIAL(info) << "EVENT: detected new SN: " << to_string(sn);
events.new_snodes.push_back(sn);
}
}
if (!found) {
BOOST_LOG_TRIVIAL(info)
<< "EVENT: detected a new swarm: " << swarm_info.swarm_id;
events.new_swarms.push_back(swarm_info.swarm_id);
/// See if there are any new swarms
for (const auto& swarm_info : swarms) {
bool found = false;
for (const auto& prev_si : all_cur_swarms_) {
if (prev_si.swarm_id == swarm_info.swarm_id) {
found = true;
break;
}
}
if (!found) {
BOOST_LOG_TRIVIAL(info)
<< "EVENT: detected a new swarm: " << swarm_info.swarm_id;
events.new_swarms.push_back(swarm_info.swarm_id);
}
}
}
}

View file

@ -176,11 +176,11 @@ bool Database::retrieve(const std::string& pubKey, std::vector<Item>& items,
sqlite3_stmt* stmt;
if (lastHash.empty()) {
if (pubKey.empty()) {
stmt = get_all_stmt;
} else if (lastHash.empty()) {
stmt = get_all_for_pk_stmt;
sqlite3_bind_text(stmt, 1, pubKey.c_str(), -1, SQLITE_STATIC);
} else if (pubKey.empty()) {
stmt = get_all_stmt;
} else {
stmt = get_stmt;
sqlite3_bind_text(stmt, 1, pubKey.c_str(), -1, SQLITE_STATIC);
@ -202,7 +202,7 @@ bool Database::retrieve(const std::string& pubKey, std::vector<Item>& items,
const auto time_expires = sqlite3_column_int64(stmt, 4);
const auto nonce = std::string((const char*)sqlite3_column_text(stmt, 5));
const auto bytes = std::string((char*)sqlite3_column_blob(stmt, 6), sqlite3_column_bytes(stmt, 6));
items.emplace_back(hash, pubKey, timestamp, ttl, time_expires, nonce, bytes);
items.emplace_back(hash, pub_key, timestamp, ttl, time_expires, nonce, bytes);
}
int rc = sqlite3_reset(stmt);