#include "lokimq.h" #include "batch.h" #include "lokimq-internal.h" namespace lokimq { void LokiMQ::proxy_batch(detail::Batch* batch) { batches.insert(batch); const auto [jobs, tagged_threads] = batch->size(); LMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : ""); if (!tagged_threads) { for (size_t i = 0; i < jobs; i++) batch_jobs.emplace(batch, i); } else { // Some (or all) jobs have a specific thread target so queue any such jobs in the tagged // worker queue. auto threads = batch->threads(); for (size_t i = 0; i < jobs; i++) { auto& jobs = threads[i] > 0 ? std::get>(tagged_workers[threads[i] - 1]) : batch_jobs; jobs.emplace(batch, i); } } proxy_skip_one_poll = true; } void LokiMQ::job(std::function f, const TaggedThread* thread) { if (thread && thread->_id == -1) throw std::logic_error{"job() cannot be used to queue an in-proxy job"}; auto* b = new Batch; b->add_job(std::move(f), thread); auto* baseptr = static_cast(b); detail::send_control(get_control_socket(), "BATCH", bt_serialize(reinterpret_cast(baseptr))); } void LokiMQ::proxy_schedule_reply_job(std::function f) { auto* b = new Batch; b->add_job(std::move(f)); batches.insert(b); reply_jobs.emplace(static_cast(b), 0); proxy_skip_one_poll = true; } void LokiMQ::proxy_run_batch_jobs(std::queue& jobs, const int reserved, int& active, bool reply) { while (!jobs.empty() && active_workers() < max_workers && (active < reserved || active_workers() < general_workers)) { proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply)); jobs.pop(); active++; } } // Called either within the proxy thread, or before the proxy thread has been created; actually adds // the timer. If the timer object hasn't been set up yet it gets set up here. void LokiMQ::proxy_timer(std::function job, std::chrono::milliseconds interval, bool squelch, int thread) { if (!timers) timers.reset(zmq_timers_new()); int timer_id = zmq_timers_add(timers.get(), interval.count(), [](int timer_id, void* self) { static_cast(self)->_queue_timer_job(timer_id); }, this); if (timer_id == -1) throw zmq::error_t{}; timer_jobs[timer_id] = { std::move(job), squelch, false, thread }; } void LokiMQ::proxy_timer(bt_list_consumer timer_data) { std::unique_ptr> func{reinterpret_cast*>(timer_data.consume_integer())}; auto interval = std::chrono::milliseconds{timer_data.consume_integer()}; auto squelch = timer_data.consume_integer(); auto thread = timer_data.consume_integer(); if (!timer_data.is_finished()) throw std::runtime_error("Internal error: proxied timer request contains unexpected data"); proxy_timer(std::move(*func), interval, squelch, thread); } void LokiMQ::_queue_timer_job(int timer_id) { auto it = timer_jobs.find(timer_id); if (it == timer_jobs.end()) { LMQ_LOG(warn, "Could not find timer job ", timer_id); return; } auto& [func, squelch, running, thread] = it->second; if (squelch && running) { LMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running"); return; } if (thread == -1) { // Run directly in proxy thread try { func(); } catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } return; } auto* b = new Batch; b->add_job(func, thread); if (squelch) { running = true; b->completion([this,timer_id](auto results) { try { results[0].get(); } catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } auto it = timer_jobs.find(timer_id); if (it != timer_jobs.end()) it->second.running = false; }, &LokiMQ::run_in_proxy); } batches.insert(b); LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); assert(b->size() == std::make_pair(size_t{1}, thread > 0)); auto& queue = thread > 0 ? std::get>(tagged_workers[thread - 1]) : batch_jobs; queue.emplace(static_cast(b), 0); } void LokiMQ::add_timer(std::function job, std::chrono::milliseconds interval, bool squelch, const TaggedThread* thread) { int th_id = thread ? thread->_id : 0; if (proxy_thread.joinable()) { detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{ detail::serialize_object(std::move(job)), interval.count(), squelch, th_id}})); } else { proxy_timer(std::move(job), interval, squelch, th_id); } } void LokiMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); } TaggedThread LokiMQ::add_tagged_thread(std::string name, std::function init, std::function start) { if (proxy_thread.joinable()) throw std::logic_error{"Cannot add tagged threads after calling `start()`"}; if (name == "_proxy"sv || name.empty() || name.find('\0') != std::string::npos) throw std::logic_error{"Invalid tagged thread name `" + name + "'"}; auto& [run, busy, queue] = tagged_workers.emplace_back(); busy = false; run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs) run.worker_routing_id = "t" + std::to_string(run.worker_id); LMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id); run.worker_thread = std::thread{[this, id=run.worker_id, name, init=std::move(init), start=std::move(start)] { if (init) init(); return worker_thread(id, name, std::move(start)); }}; return {std::move(name), static_cast(run.worker_id)}; } }