From 061bdee0a8daee96037889afc6d7c920f913e057 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 11 Feb 2020 02:29:00 -0400 Subject: [PATCH] Add zmq timer support --- TODO.txt | 2 - lokimq/lokimq.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++++- lokimq/lokimq.h | 25 ++++++++++++ 3 files changed, 124 insertions(+), 3 deletions(-) diff --git a/TODO.txt b/TODO.txt index 9ec8511..5304c6e 100644 --- a/TODO.txt +++ b/TODO.txt @@ -2,5 +2,3 @@ - split out proxy code & data into a private lokimq/proxy.h header so that the main header doesn't need to include so much. -- timed, i.e. schedule this job to run in X time. This requires dynamically adjusting the polling - timeout to use the current value *or* the next timer (if sooner). diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 7809a6e..6bd58c7 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -697,6 +697,38 @@ void LokiMQ::proxy_batch(detail::Batch* batchptr) { batch_jobs.emplace(batch, i); } +Batch* LokiMQ::proxy_schedule_job(std::function f) { + auto* b = new Batch; + b->add_job(std::move(f)); + batches.insert(b); + batch_jobs.emplace(static_cast(b), 0); + return b; +} + +// Called either within the proxy thread, or before the proxy thread has been created; actually adds +// the timer. If the timer object hasn't been set up yet it gets set up here. +void LokiMQ::proxy_timer(std::function job, std::chrono::milliseconds interval, bool squelch) { + if (!timers) + timers.reset(zmq_timers_new()); + + int timer_id = zmq_timers_add(timers.get(), + interval.count(), + [](int timer_id, void* self) { static_cast(self)->_queue_timer_job(timer_id); }, + this); + if (timer_id == -1) + throw zmq::error_t{}; + timer_jobs[timer_id] = {std::move(job), squelch, false}; +} + +void LokiMQ::proxy_timer(bt_list_consumer timer_data) { + std::unique_ptr> func{reinterpret_cast*>(timer_data.consume_integer())}; + auto interval = std::chrono::milliseconds{timer_data.consume_integer()}; + auto squelch = timer_data.consume_integer(); + if (!timer_data.is_finished()) + throw std::runtime_error("Internal error: proxied timer request contains unexpected data"); + proxy_timer(std::move(*func), interval, squelch); +} + void LokiMQ::proxy_control_message(std::vector& parts) { if (parts.size() < 2) throw std::logic_error("Expected 2-3 message parts for a proxy control message"); @@ -716,6 +748,8 @@ void LokiMQ::proxy_control_message(std::vector& parts) { } else if (cmd == "CONNECT") { proxy_connect(bt_deserialize(view(parts[2]))); return; + } else if (cmd == "TIMER") { + return proxy_timer(view(parts[2])); } } else if (parts.size() == 2) { if (cmd == "START") { @@ -845,18 +879,23 @@ void LokiMQ::proxy_loop() { assert(pollitems.size() == poll_remote_offset); - constexpr auto poll_timeout = 5000ms; // Maximum time we spend in each poll constexpr auto timeout_check_interval = 10000ms; // Minimum time before for checking for connections to close since the last check auto last_conn_timeout = std::chrono::steady_clock::now(); + if (!timers) + timers.reset(zmq_timers_new()); std::vector parts; while (true) { + std::chrono::milliseconds poll_timeout; if (max_workers == 0) { // Will be 0 only if we are quitting if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.thread.joinable(); })) { // All the workers have finished, so we can finish shutting down return proxy_quit(); } + poll_timeout = 1s; // We don't keep running timers when we're quitting, so don't have a timer to check + } else { + poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())}; } // We poll the control socket and worker socket for any incoming messages. If we have @@ -876,6 +915,9 @@ void LokiMQ::proxy_loop() { proxy_worker_message(parts); } + LMQ_LOG(trace, "processing timers"); + zmq_timers_execute(timers.get()); + // Handle any zap authentication LMQ_LOG(trace, "processing zap requests"); process_zap_requests(zap_auth); @@ -1380,6 +1422,62 @@ inline void LokiMQ::job(std::function f) { detail::send_control(get_control_socket(), "BATCH", bt_serialize(reinterpret_cast(baseptr))); } +void LokiMQ::_queue_timer_job(int timer_id) { + auto it = timer_jobs.find(timer_id); + if (it == timer_jobs.end()) { + LMQ_LOG(warn, "Could not find timer job ", timer_id); + return; + } + auto& timer = it->second; + auto& squelch = std::get<1>(timer); + auto& running = std::get<2>(timer); + if (squelch && running) { + LMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running"); + return; + } + + auto* b = new Batch; + b->add_job(std::get<0>(timer)); + if (squelch) { + running = true; + b->completion_proxy([this,timer_id](auto results) { + try { results[0].get(); } + catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } + catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } + auto it = timer_jobs.find(timer_id); + if (it != timer_jobs.end()) + std::get<2>(it->second)/*running*/ = false; + }); + } + batches.insert(b); + batch_jobs.emplace(static_cast(b), 0); + assert(b->size() == 1); +} + +void LokiMQ::add_timer(std::function job, std::chrono::milliseconds interval, bool squelch) { + if (proxy_thread.joinable()) { + auto *jobptr = new std::function{std::move(job)}; + detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{ + reinterpret_cast(jobptr), + interval.count(), + squelch}})); + } else { + proxy_timer(std::move(job), interval, squelch); + } +} + +void LokiMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); } + +std::ostream &operator<<(std::ostream &os, LogLevel lvl) { + os << (lvl == LogLevel::trace ? "trace" : + lvl == LogLevel::debug ? "debug" : + lvl == LogLevel::info ? "info" : + lvl == LogLevel::warn ? "warn" : + lvl == LogLevel::error ? "ERROR" : + lvl == LogLevel::fatal ? "FATAL" : + "unknown"); + return os; +} } diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index d3e3e1a..82bb52d 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -44,6 +44,11 @@ #include "bt_serialize.h" #include "string_view.h" +#if ZMQ_VERSION < ZMQ_MAKE_VERSION (4, 3, 0) +// Timers were not added until 4.3.0 +#error "ZMQ >= 4.3.0 required" +#endif + namespace lokimq { using namespace std::literals; @@ -312,6 +317,16 @@ private: /// or send a message. zmq::socket_t command{context, zmq::socket_type::router}; + /// Timers. TODO: once cppzmq adds an interface around the zmq C timers API then switch to it. + struct TimersDeleter { void operator()(void* timers); }; + std::unordered_map, bool, bool>> timer_jobs; // id => {func, squelch, running} + std::unique_ptr timers; +public: + // This needs to be public because we have to be able to call it from a plain C function. + // Nothing external may call it! + void _queue_timer_job(int); +private: + /// Router socket to reach internal worker threads from proxy zmq::socket_t workers_socket{context, zmq::socket_type::router}; @@ -340,6 +355,8 @@ private: void proxy_process_queue(); + Batch* proxy_schedule_job(std::function f); + /// Looks up a peers element given a zmq message (which has the pubkey and sn status metadata /// set during initial connection authentication), creating a new peer element if required. decltype(peers)::iterator proxy_lookup_peer(zmq::message_t& msg); @@ -401,6 +418,14 @@ private: /// take over and queue batch jobs. void proxy_batch(detail::Batch* batch); + /// TIMER command. Called with a serialized list containing: function pointer to assume + /// ownership of, an interval count (in ms), and whether or not jobs should be squelched (see + /// `add_timer()`). + void proxy_timer(bt_list_consumer timer_data); + + /// Same, but deserialized + void proxy_timer(std::function job, std::chrono::milliseconds interval, bool squelch); + /// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this is called with the /// zap auth socket to do non-blocking processing of any waiting authentication requests waiting /// on it to verify whether the connection is from a valid/allowed SN.