Fix delayed proxy-scheduled batch jobs

Batch jobs scheduled by the proxy thread itself were delayed to the next
poll timeout (because nothing ever gets sent on a socket).  Add a
variable to bypass the next poll to handle this case.
This commit is contained in:
Jason Rhinelander 2020-02-11 19:08:19 -04:00
parent ccfb6d080b
commit 63e70f9912
8 changed files with 119 additions and 26 deletions

View File

@ -757,6 +757,7 @@ Batch<void>* LokiMQ::proxy_schedule_job(std::function<void()> f) {
b->add_job(std::move(f));
batches.insert(b);
batch_jobs.emplace(static_cast<detail::Batch*>(b), 0);
proxy_skip_poll = true;
return b;
}
@ -1013,11 +1014,16 @@ void LokiMQ::proxy_loop() {
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
}
// We poll the control socket and worker socket for any incoming messages. If we have
// available worker room then also poll incoming connections and outgoing connections for
// messages to forward to a worker. Otherwise, we just look for a control message or a
// worker coming back with a ready message.
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
if (proxy_skip_poll)
proxy_skip_poll = false;
else {
LMQ_LOG(trace, "polling for new messages");
// We poll the control socket and worker socket for any incoming messages. If we have
// available worker room then also poll incoming connections and outgoing connections
// for messages to forward to a worker. Otherwise, we just look for a control message
// or a worker coming back with a ready message.
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
LMQ_LOG(trace, "processing control messages");
// Retrieve any waiting incoming control messages
@ -1251,7 +1257,7 @@ bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector<zmq::message_t>& p
LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring");
return true;
}
LMQ_LOG(info, "Incoming client from ", peer_address(parts.back()), " send HI, replying with HELLO");
LMQ_LOG(info, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO");
send_routed_message(listener, route, "HELLO");
return true;
} else if (cmd == "HELLO") {

View File

@ -385,6 +385,10 @@ private:
/// Worker thread loop
void worker_thread(unsigned int index);
/// If set, skip polling for one proxy loop iteration (set when we know we have something
/// processible without having to shove it onto a socket, such as scheduling an internal job).
bool proxy_skip_poll = false;
/// Does the proxying work
void proxy_loop();

View File

@ -4,6 +4,7 @@ add_subdirectory(Catch2)
set(LMQ_TEST_SRC
main.cpp
test_batch.cpp
test_connect.cpp
test_commands.cpp
test_requests.cpp
)

32
tests/common.h Normal file
View File

@ -0,0 +1,32 @@
#pragma once
#include "lokimq/lokimq.h"
#include <catch2/catch.hpp>
using namespace lokimq;
static auto startup = std::chrono::steady_clock::now();
// Catch2 macros aren't thread safe, so guard with a mutex
inline std::unique_lock<std::mutex> catch_lock() {
static std::mutex mutex;
return std::unique_lock<std::mutex>{mutex};
}
inline LokiMQ::Logger get_logger(std::string prefix = "") {
std::string me = "tests/common.h";
std::string strip = __FILE__;
if (strip.substr(strip.size() - me.size()) == me)
strip.resize(strip.size() - me.size());
else
strip.clear();
return [prefix,strip](LogLevel lvl, std::string file, int line, std::string msg) {
if (!strip.empty() && file.substr(0, strip.size()) == strip)
file = file.substr(strip.size());
auto lock = catch_lock();
UNSCOPED_INFO(prefix << "[" << file << ":" << line << "/"
"+" << std::chrono::duration<double>(std::chrono::steady_clock::now() - startup).count() << "s]: "
<< lvl << ": " << msg);
};
}

View File

@ -1,7 +1,6 @@
#include "lokimq/lokimq.h"
#include "lokimq/batch.h"
#include "common.h"
#include <future>
#include <catch2/catch.hpp>
double do_my_task(int input) {
if (input % 10 == 7)

View File

@ -1,10 +1,9 @@
#include "lokimq/lokimq.h"
#include "common.h"
#include <future>
#include <catch2/catch.hpp>
using namespace lokimq;
TEST_CASE("basic commands", "[cmd-basic]") {
TEST_CASE("basic commands", "[commands]") {
std::string listen = "tcp://127.0.0.1:4567";
LokiMQ server{
"", "", // generate ephemeral keys
@ -12,7 +11,9 @@ TEST_CASE("basic commands", "[cmd-basic]") {
{listen},
[](auto &) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
get_logger("")
};
server.log_level(LogLevel::trace);
std::atomic<int> hellos{0}, his{0};
@ -24,10 +25,10 @@ TEST_CASE("basic commands", "[cmd-basic]") {
});
server.start();
LokiMQ client(
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
);
//client.log_level(LogLevel::trace);
LokiMQ client{
get_logger("")
};
client.log_level(LogLevel::trace);
client.add_category("public", Access{AuthLevel::none});
client.add_command("public", "hi", [&](auto&) { his++; });
@ -41,24 +42,26 @@ TEST_CASE("basic commands", "[cmd-basic]") {
[&](string_view) { failed = true; },
server.get_pubkey());
for (int i = 0; i < 20; i++) {
int i;
for (i = 0; i < 5; i++) {
if (connected.load())
break;
std::this_thread::sleep_for(100ms);
std::this_thread::sleep_for(50ms);
}
REQUIRE( connected.load() );
REQUIRE( i <= 1 ); // should be fast
REQUIRE( !failed.load() );
REQUIRE( pubkey == server.get_pubkey() );
client.send(pubkey, "public.hello");
std::this_thread::sleep_for(200ms);
std::this_thread::sleep_for(50ms);
REQUIRE( hellos == 1 );
REQUIRE( his == 1 );
for (int i = 0; i < 50; i++)
client.send(pubkey, "public.hello");
std::this_thread::sleep_for(200ms);
std::this_thread::sleep_for(100ms);
REQUIRE( hellos == 51 );
REQUIRE( his == 26 );
}

48
tests/test_connect.cpp Normal file
View File

@ -0,0 +1,48 @@
#include "common.h"
#include <future>
TEST_CASE("connections", "[curve][connect]") {
std::string listen = "tcp://127.0.0.1:4455";
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
{listen},
[](auto &) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
get_logger("")
};
// server.log_level(LogLevel::trace);
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); });
server.start();
LokiMQ client{get_logger("")};
client.log_level(LogLevel::trace);
client.start();
auto pubkey = server.get_pubkey();
std::atomic<int> connected{0};
client.connect_remote(listen,
[&](std::string pk) { connected = 1; },
[&](string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); },
pubkey);
int i;
for (i = 0; i < 5; i++) {
if (connected.load())
break;
std::this_thread::sleep_for(50ms);
}
REQUIRE( i <= 1 );
REQUIRE( connected.load() );
bool success = false;
std::vector<std::string> parts;
client.request(pubkey, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; });
std::this_thread::sleep_for(50ms);
REQUIRE( success );
}

View File

@ -1,10 +1,9 @@
#include "lokimq/lokimq.h"
#include "common.h"
#include <future>
#include <catch2/catch.hpp>
using namespace lokimq;
TEST_CASE("basic requests", "[req-basic]") {
TEST_CASE("basic requests", "[requests]") {
std::string listen = "tcp://127.0.0.1:5678";
LokiMQ server{
"", "", // generate ephemeral keys
@ -37,13 +36,15 @@ TEST_CASE("basic requests", "[req-basic]") {
[&](string_view) { failed = true; },
server.get_pubkey());
for (int i = 0; i < 20; i++) {
int i;
for (i = 0; i < 5; i++) {
if (connected.load())
break;
std::this_thread::sleep_for(100ms);
std::this_thread::sleep_for(50ms);
}
REQUIRE( connected.load() );
REQUIRE( !failed.load() );
REQUIRE( i <= 1 );
REQUIRE( pubkey == server.get_pubkey() );
std::atomic<bool> got_reply{false};
@ -55,8 +56,7 @@ TEST_CASE("basic requests", "[req-basic]") {
data = std::move(data_);
});
// FIXME: we shouldn't need to wait this long (perhaps explore zmq send immediate?)
std::this_thread::sleep_for(1500ms);
std::this_thread::sleep_for(50ms);
REQUIRE( got_reply.load() );
REQUIRE( success );
REQUIRE( data == std::vector<std::string>{{"123"}} );