mirror of https://github.com/oxen-io/oxen-mq.git
Add thread-safe timerid assignment version of add_timer()
I realized after merging the previous PR that it is difficult to correctly pass ownership into a timer, because something like: TimerID x = omq.add_timer([&] { omq.cancel_timer(x); }, 5ms); doesn't work when the timer job needs to outlive the caller. My next approach was: auto x = std::make_shared<TimerID>(); *x = omq.add_timer([&omq, x] { omq.cancel_timer(*x); }, 5ms); but this has two problems: first, TimerID wasn't default constructible, and second, there is no guarantee that the assignment to *x happens before (and is visible to) the access for the cancellation. This commit fixes both issues: TimerID is now default constructible, and an overload is added that takes the lvalue reference to the TimerID to set rather than returning it (and guarantees that it will be set before the timer is created).
This commit is contained in:
parent
3991f50547
commit
cdc6a9709c
|
@ -120,20 +120,25 @@ void OxenMQ::_queue_timer_job(int timer_id) {
|
|||
queue.emplace(static_cast<detail::Batch*>(b), 0);
|
||||
}
|
||||
|
||||
TimerID OxenMQ::add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
|
||||
int id = next_timer_id++;
|
||||
void OxenMQ::add_timer(TimerID& timer, std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
|
||||
int th_id = thread ? thread->_id : 0;
|
||||
timer._id = next_timer_id++;
|
||||
if (proxy_thread.joinable()) {
|
||||
detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{
|
||||
id,
|
||||
timer._id,
|
||||
detail::serialize_object(std::move(job)),
|
||||
interval.count(),
|
||||
squelch,
|
||||
th_id}}));
|
||||
} else {
|
||||
proxy_timer(id, std::move(job), interval, squelch, th_id);
|
||||
proxy_timer(timer._id, std::move(job), interval, squelch, th_id);
|
||||
}
|
||||
return TimerID{id};
|
||||
}
|
||||
|
||||
TimerID OxenMQ::add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
|
||||
TimerID tid;
|
||||
add_timer(tid, std::move(job), interval, squelch, std::move(thread));
|
||||
return tid;
|
||||
}
|
||||
|
||||
void OxenMQ::proxy_timer_del(int id) {
|
||||
|
|
|
@ -104,10 +104,11 @@ private:
|
|||
template <typename R> friend class Batch;
|
||||
};
|
||||
|
||||
/// Opaque handler for a timer constructed by add_timer(...). Not directly constructible, but is
|
||||
/// safe (and cheap) to copy. The only real use of this is to pass it in to cancel_timer() to
|
||||
/// cancel a timer.
|
||||
/// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only
|
||||
/// real use of this is to pass it in to cancel_timer() to cancel a timer.
|
||||
struct TimerID {
|
||||
// Default construction; creates an object with a non-timer internal id value.
|
||||
TimerID() : _id{0} {}
|
||||
private:
|
||||
int _id;
|
||||
explicit constexpr TimerID(int id) : _id{id} {}
|
||||
|
@ -1264,6 +1265,25 @@ public:
|
|||
*/
|
||||
TimerID add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch = true, std::optional<TaggedThreadID> = std::nullopt);
|
||||
|
||||
/** Same as add_timer, above, except that it sets `timer` directly before adding the timer
|
||||
* rather than returning it.
|
||||
*
|
||||
* This is recommended over the above in cases where the timer is extremely fast *and*
|
||||
* cancellation will occur inside the job itself. This version of the method guarantees that
|
||||
* `timer` will be assigned to before the job is added to the job schedule so as to guarantee
|
||||
* that `job` can safely use `timer` without needing to synchronize the assignment with the
|
||||
* thread creating the timer.
|
||||
*
|
||||
* If in doubt and you need to cancel a job from within the job itself, use this method.
|
||||
*
|
||||
* Example usage:
|
||||
*
|
||||
* auto timer = std::make_shared<TimerID>();
|
||||
* auto& timer_ref = *timer; // Get reference before we move away the shared_ptr
|
||||
* omq.add_timer(timer_ref, [timer=std::move(timer)] { ...; cancel_timer(*timer); });
|
||||
*/
|
||||
void add_timer(TimerID& timer, std::function<void()> job, std::chrono::milliseconds interval, bool squelch = true, std::optional<TaggedThreadID> = std::nullopt);
|
||||
|
||||
/**
|
||||
* Cancels a running timer. Note that an existing timer job (or multiple, if the timer disabled
|
||||
* squelch) that have already been scheduled may still be executed after cancel_timer is called.
|
||||
|
|
|
@ -97,5 +97,35 @@ TEST_CASE("timer cancel", "[timer][cancel]") {
|
|||
auto lock = catch_lock();
|
||||
REQUIRE( ticks.load() == 3 );
|
||||
}
|
||||
|
||||
// Test the alternative taking an lvalue reference instead of returning by value (see oxenmq.h
|
||||
// for why this is sometimes needed).
|
||||
std::atomic<int> ticks3 = 0;
|
||||
std::weak_ptr<TimerID> w_timer3;
|
||||
{
|
||||
auto timer3 = std::make_shared<TimerID>();
|
||||
auto& t3ref = *timer3; // Get this reference *before* we move the shared pointer into the lambda
|
||||
omq.add_timer(t3ref, [&ticks3, &omq, timer3=std::move(timer3)] {
|
||||
if (ticks3 == 0)
|
||||
ticks3++;
|
||||
else if (ticks3 > 1) {
|
||||
omq.cancel_timer(*timer3);
|
||||
ticks3++;
|
||||
}
|
||||
}, 1ms);
|
||||
}
|
||||
|
||||
wait_for([&] { return ticks3.load() >= 1; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( ticks3.load() == 1 );
|
||||
}
|
||||
ticks3++;
|
||||
wait_for([&] { return ticks3.load() >= 3 && w_timer3.expired(); });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( ticks3.load() == 3 );
|
||||
REQUIRE( w_timer3.expired() );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue