Modify task classes to support storing callback and pointer.

This allows registering continuations that are not coroutine
handles which can be useful for building some operators.
This comes at the expense of storage of an extra pointer per
coroutine and/or awaiter.

Adds the cppcoro::detail::resumer to wrap something that can
either be a callback + state or a coroutine_handle.
This commit is contained in:
Lewis Baker 2017-07-30 22:44:52 +09:30
parent 9dc7b36296
commit c22bef61a9
5 changed files with 99 additions and 34 deletions

View file

@ -0,0 +1,61 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_DETAIL_RESUMER_HPP_INCLUDED
#define CPPCORO_DETAIL_RESUMER_HPP_INCLUDED
#include <experimental/coroutine>
namespace cppcoro
{
namespace detail
{
class resumer
{
public:
using callback_t = void(void*);
resumer() noexcept
: m_callback(nullptr)
, m_state(nullptr)
{}
explicit resumer(std::experimental::coroutine_handle<> awaiter) noexcept
: m_callback(nullptr)
, m_state(awaiter.address())
{}
explicit resumer(callback_t* callback, void* state) noexcept
: m_callback(callback)
, m_state(state)
{}
explicit operator bool() const noexcept
{
return m_callback != nullptr || m_state != nullptr;
}
void resume() noexcept
{
if (m_callback == nullptr)
{
std::experimental::coroutine_handle<>::from_address(m_state).resume();
}
else
{
m_callback(m_state);
}
}
private:
callback_t* m_callback;
void* m_state;
};
}
}
#endif

View file

@ -7,6 +7,7 @@
#include <cppcoro/broken_promise.hpp>
#include <cppcoro/fmap.hpp>
#include <cppcoro/detail/resumer.hpp>
#include <atomic>
#include <exception>
@ -26,7 +27,7 @@ namespace cppcoro
public:
lazy_task_promise_base() noexcept
: m_awaiter(nullptr)
: m_resumer()
{}
auto initial_suspend() noexcept
@ -38,23 +39,23 @@ namespace cppcoro
{
struct awaitable
{
std::experimental::coroutine_handle<> m_awaiter;
resumer m_resumer;
awaitable(std::experimental::coroutine_handle<> awaiter) noexcept
: m_awaiter(awaiter)
awaitable(resumer resumer) noexcept
: m_resumer(resumer)
{}
bool await_ready() const noexcept { return false; }
void await_suspend([[maybe_unused]] std::experimental::coroutine_handle<> coroutine)
{
m_awaiter.resume();
m_resumer.resume();
}
void await_resume() noexcept {}
};
return awaitable{ m_awaiter };
return awaitable{ m_resumer };
}
void unhandled_exception() noexcept
@ -64,12 +65,12 @@ namespace cppcoro
bool is_ready() const noexcept
{
return static_cast<bool>(m_awaiter);
return static_cast<bool>(m_resumer);
}
void set_awaiter(std::experimental::coroutine_handle<> awaiter)
void set_resumer(resumer resumer)
{
m_awaiter = awaiter;
m_resumer = resumer;
}
protected:
@ -89,7 +90,7 @@ namespace cppcoro
private:
std::experimental::coroutine_handle<> m_awaiter;
resumer m_resumer;
std::exception_ptr m_exception;
};
@ -238,7 +239,7 @@ namespace cppcoro
void await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept
{
m_coroutine.promise().set_awaiter(awaiter);
m_coroutine.promise().set_resumer(detail::resumer{ awaiter });
m_coroutine.resume();
}
};

View file

@ -7,6 +7,7 @@
#include <cppcoro/broken_promise.hpp>
#include <cppcoro/lazy_task.hpp>
#include <cppcoro/detail/resumer.hpp>
#include <atomic>
#include <exception>
@ -24,7 +25,7 @@ namespace cppcoro
{
struct shared_lazy_task_waiter
{
std::experimental::coroutine_handle<> m_coroutine;
resumer m_resumer;
shared_lazy_task_waiter* m_next;
};
@ -74,15 +75,15 @@ namespace cppcoro
void* waiters = m_waiters.exchange(valueReadyValue, std::memory_order_acq_rel);
if (waiters != nullptr)
{
shared_lazy_task_waiter* next = static_cast<shared_lazy_task_waiter*>(waiters);
shared_lazy_task_waiter* waiter = static_cast<shared_lazy_task_waiter*>(waiters);
do
{
// Read the m_next pointer before resuming the coroutine
// since resuming the coroutine may destroy the shared_task_waiter value.
auto coroutine = next->m_coroutine;
next = next->m_next;
coroutine.resume();
} while (next != nullptr);
auto* next = waiter->m_next;
waiter->m_resumer.resume();
waiter = next;
} while (waiter != nullptr);
}
return awaitable{ *this };
@ -323,7 +324,7 @@ namespace cppcoro
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept
{
m_waiter.m_coroutine = awaiter;
m_waiter.m_resumer = detail::resumer{ awaiter };
return m_coroutine.promise().try_await(&m_waiter, m_coroutine);
}
};

View file

@ -7,6 +7,7 @@
#include <cppcoro/broken_promise.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/detail/resumer.hpp>
#include <atomic>
#include <exception>
@ -27,7 +28,7 @@ namespace cppcoro
{
struct shared_task_waiter
{
std::experimental::coroutine_handle<> m_coroutine;
cppcoro::detail::resumer m_resumer;
shared_task_waiter* m_next;
};
@ -73,15 +74,15 @@ namespace cppcoro
void* waiters = m_waiters.exchange(static_cast<void*>(this), std::memory_order_acq_rel);
if (waiters != nullptr)
{
shared_task_waiter* next = static_cast<shared_task_waiter*>(waiters);
shared_task_waiter* waiter = static_cast<shared_task_waiter*>(waiters);
do
{
// Read the m_next pointer before resuming the coroutine
// since resuming the coroutine may destroy the shared_task_waiter value.
auto coroutine = next->m_coroutine;
next = next->m_next;
coroutine.resume();
} while (next != nullptr);
auto* next = waiter->m_next;
waiter->m_resumer.resume();
waiter = next;
} while (waiter != nullptr);
}
return awaitable{ *this };
@ -287,7 +288,7 @@ namespace cppcoro
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept
{
m_waiter.m_coroutine = awaiter;
m_waiter.m_resumer = detail::resumer{ awaiter };
return m_coroutine.promise().try_await(&m_waiter);
}
};
@ -315,10 +316,10 @@ namespace cppcoro
shared_task(const shared_task& other) noexcept
: m_coroutine(other.m_coroutine)
{
if (m_coroutine)
{
m_coroutine.promise().add_ref();
}
if (m_coroutine)
{
m_coroutine.promise().add_ref();
}
}
~shared_task()

View file

@ -7,6 +7,7 @@
#include <cppcoro/broken_promise.hpp>
#include <cppcoro/fmap.hpp>
#include <cppcoro/detail/resumer.hpp>
#include <atomic>
#include <exception>
@ -57,7 +58,7 @@ namespace cppcoro
state oldState = m_promise.m_state.exchange(state::finished, std::memory_order_acq_rel);
if (oldState == state::consumer_suspended)
{
m_promise.m_awaiter.resume();
m_promise.m_resumer.resume();
}
return oldState != state::consumer_detached;
@ -91,9 +92,9 @@ namespace cppcoro
std::memory_order_acq_rel) == state::running;
}
bool try_await(std::experimental::coroutine_handle<> awaiter)
bool try_await(detail::resumer resumer)
{
m_awaiter = awaiter;
m_resumer = resumer;
state oldState = state::running;
return m_state.compare_exchange_strong(
@ -129,7 +130,7 @@ namespace cppcoro
};
std::atomic<state> m_state;
std::experimental::coroutine_handle<> m_awaiter;
cppcoro::detail::resumer m_resumer;
std::exception_ptr m_exception;
};
@ -251,7 +252,7 @@ namespace cppcoro
bool await_suspend(std::experimental::coroutine_handle<> awaiter) noexcept
{
return m_coroutine.promise().try_await(awaiter);
return m_coroutine.promise().try_await(detail::resumer{ awaiter });
}
};