Make more functions work with arbitrary awaitables.
- Change fmap, schedule_on, resume_on, make_shared_task work with arbitrary awaitables. - Add make_task to wrap an arbitrary awaitable in a task<T>. - Remove specialisation of fmap() for task<T>, shared_task<T> as the generalised version obsoletes it.
This commit is contained in:
parent
a64b0e145d
commit
5ea774d1f9
|
@ -5,11 +5,119 @@
|
|||
#ifndef CPPCORO_FMAP_HPP_INCLUDED
|
||||
#define CPPCORO_FMAP_HPP_INCLUDED
|
||||
|
||||
#include <cppcoro/awaitable_traits.hpp>
|
||||
#include <cppcoro/is_awaitable.hpp>
|
||||
|
||||
#include <utility>
|
||||
#include <type_traits>
|
||||
#include <functional>
|
||||
|
||||
namespace cppcoro
|
||||
{
|
||||
namespace detail
|
||||
{
|
||||
template<typename FUNC, typename AWAITABLE>
|
||||
class fmap_awaiter
|
||||
{
|
||||
using awaiter_t = typename awaitable_traits<AWAITABLE&&>::awaiter_t;
|
||||
|
||||
public:
|
||||
|
||||
fmap_awaiter(FUNC&& func, AWAITABLE&& awaitable)
|
||||
noexcept(
|
||||
std::is_nothrow_move_constructible_v<awaiter_t> &&
|
||||
noexcept(detail::get_awaiter(static_cast<AWAITABLE&&>(awaitable))))
|
||||
: m_func(static_cast<FUNC&&>(func))
|
||||
, m_awaiter(detail::get_awaiter(static_cast<AWAITABLE&&>(awaitable)))
|
||||
{}
|
||||
|
||||
decltype(auto) await_ready()
|
||||
noexcept(noexcept(static_cast<awaiter_t&&>(m_awaiter).await_ready()))
|
||||
{
|
||||
return static_cast<awaiter_t&&>(m_awaiter).await_ready();
|
||||
}
|
||||
|
||||
template<typename PROMISE>
|
||||
decltype(auto) await_suspend(std::experimental::coroutine_handle<PROMISE> coro)
|
||||
noexcept(noexcept(static_cast<awaiter_t&&>(m_awaiter).await_suspend(std::move(coro))))
|
||||
{
|
||||
return static_cast<awaiter_t&&>(m_awaiter).await_suspend(std::move(coro));
|
||||
}
|
||||
|
||||
template<
|
||||
typename AWAIT_RESULT = decltype(std::declval<awaiter_t>().await_resume()),
|
||||
std::enable_if_t<std::is_void_v<AWAIT_RESULT>, int> = 0>
|
||||
decltype(auto) await_resume()
|
||||
noexcept(noexcept(std::invoke(static_cast<FUNC&&>(m_func))))
|
||||
{
|
||||
static_cast<awaiter_t&&>(m_awaiter).await_resume();
|
||||
return std::invoke(static_cast<FUNC&&>(m_func));
|
||||
}
|
||||
|
||||
template<
|
||||
typename AWAIT_RESULT = decltype(std::declval<awaiter_t>().await_resume()),
|
||||
std::enable_if_t<!std::is_void_v<AWAIT_RESULT>, int> = 0>
|
||||
decltype(auto) await_resume()
|
||||
noexcept(noexcept(std::invoke(static_cast<FUNC&&>(m_func), static_cast<awaiter_t&&>(m_awaiter).await_resume())))
|
||||
{
|
||||
return std::invoke(
|
||||
static_cast<FUNC&&>(m_func),
|
||||
static_cast<awaiter_t&&>(m_awaiter).await_resume());
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
FUNC&& m_func;
|
||||
awaiter_t m_awaiter;
|
||||
|
||||
};
|
||||
|
||||
template<typename FUNC, typename AWAITABLE>
|
||||
class fmap_awaitable
|
||||
{
|
||||
static_assert(!std::is_lvalue_reference_v<FUNC>);
|
||||
static_assert(!std::is_lvalue_reference_v<AWAITABLE>);
|
||||
public:
|
||||
|
||||
template<
|
||||
typename FUNC_ARG,
|
||||
typename AWAITABLE_ARG,
|
||||
std::enable_if_t<
|
||||
std::is_constructible_v<FUNC, FUNC_ARG&&> &&
|
||||
std::is_constructible_v<AWAITABLE, AWAITABLE_ARG&&>, int> = 0>
|
||||
explicit fmap_awaitable(FUNC_ARG&& func, AWAITABLE_ARG&& awaitable)
|
||||
noexcept(
|
||||
std::is_nothrow_constructible_v<FUNC, FUNC_ARG&&> &&
|
||||
std::is_nothrow_constructible_v<AWAITABLE, AWAITABLE_ARG&&>)
|
||||
: m_func(static_cast<FUNC_ARG&&>(func))
|
||||
, m_awaitable(static_cast<AWAITABLE_ARG&&>(awaitable))
|
||||
{}
|
||||
|
||||
auto operator co_await() const &
|
||||
{
|
||||
return fmap_awaiter<const FUNC&, const AWAITABLE&>(m_func, m_awaitable);
|
||||
}
|
||||
|
||||
auto operator co_await() &
|
||||
{
|
||||
return fmap_awaiter<FUNC&, AWAITABLE&>(m_func, m_awaitable);
|
||||
}
|
||||
|
||||
auto operator co_await() &&
|
||||
{
|
||||
return fmap_awaiter<FUNC&&, AWAITABLE&&>(
|
||||
static_cast<FUNC&&>(m_func),
|
||||
static_cast<AWAITABLE&&>(m_awaitable));
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
FUNC m_func;
|
||||
AWAITABLE m_awaitable;
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
template<typename FUNC>
|
||||
struct fmap_transform
|
||||
{
|
||||
|
@ -21,6 +129,19 @@ namespace cppcoro
|
|||
FUNC func;
|
||||
};
|
||||
|
||||
template<
|
||||
typename FUNC,
|
||||
typename AWAITABLE,
|
||||
std::enable_if_t<cppcoro::is_awaitable_v<AWAITABLE>, int> = 0>
|
||||
auto fmap(FUNC&& func, AWAITABLE&& awaitable)
|
||||
{
|
||||
return detail::fmap_awaitable<
|
||||
std::remove_cv_t<std::remove_reference_t<FUNC>>,
|
||||
std::remove_cv_t<std::remove_reference_t<AWAITABLE>>>(
|
||||
std::forward<FUNC>(func),
|
||||
std::forward<AWAITABLE>(awaitable));
|
||||
}
|
||||
|
||||
template<typename FUNC>
|
||||
auto fmap(FUNC&& func)
|
||||
{
|
||||
|
|
|
@ -6,8 +6,12 @@
|
|||
#define CPPCORO_RESUME_ON_HPP_INCLUDED
|
||||
|
||||
#include <cppcoro/task.hpp>
|
||||
#include <cppcoro/shared_task.hpp>
|
||||
#include <cppcoro/async_generator.hpp>
|
||||
#include <cppcoro/awaitable_traits.hpp>
|
||||
#include <cppcoro/detail/get_awaiter.hpp>
|
||||
|
||||
#include <exception>
|
||||
#include <type_traits>
|
||||
|
||||
namespace cppcoro
|
||||
{
|
||||
|
@ -33,20 +37,81 @@ namespace cppcoro
|
|||
return resume_on(transform.scheduler, std::forward<T>(value));
|
||||
}
|
||||
|
||||
template<typename SCHEDULER, typename T>
|
||||
task<T> resume_on(SCHEDULER& scheduler, task<T> task)
|
||||
template<
|
||||
typename SCHEDULER,
|
||||
typename AWAITABLE,
|
||||
typename AWAIT_RESULT = detail::remove_rvalue_reference_t<typename awaitable_traits<AWAITABLE>::await_result_t>,
|
||||
std::enable_if_t<!std::is_void_v<AWAIT_RESULT>, int> = 0>
|
||||
auto resume_on(SCHEDULER& scheduler, AWAITABLE awaitable)
|
||||
-> task<AWAIT_RESULT>
|
||||
{
|
||||
co_await task.when_ready();
|
||||
co_await scheduler.schedule();
|
||||
co_return co_await std::move(task);
|
||||
bool rescheduled = false;
|
||||
std::exception_ptr ex;
|
||||
try
|
||||
{
|
||||
// We manually get the awaiter here so that we can keep
|
||||
// it alive across the call to `scheduler.schedule()`
|
||||
// just in case the result is a reference to a value
|
||||
// in the awaiter that would otherwise be a temporary
|
||||
// and destructed before the value could be returned.
|
||||
|
||||
auto&& awaiter = detail::get_awaiter(static_cast<AWAITABLE&&>(awaitable));
|
||||
|
||||
auto&& result = co_await static_cast<decltype(awaiter)>(awaiter);
|
||||
|
||||
// Flag as rescheduled before scheduling in case it is the
|
||||
// schedule() operation that throws an exception as we don't
|
||||
// want to attempt to schedule twice if scheduling fails.
|
||||
rescheduled = true;
|
||||
|
||||
co_await scheduler.schedule();
|
||||
|
||||
co_return static_cast<decltype(result)>(result);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
// We still want to resume on the scheduler even in the presence
|
||||
// of an exception.
|
||||
if (!rescheduled)
|
||||
{
|
||||
co_await scheduler.schedule();
|
||||
}
|
||||
|
||||
std::rethrow_exception(ex);
|
||||
}
|
||||
|
||||
template<typename SCHEDULER, typename T>
|
||||
task<T> resume_on(SCHEDULER& scheduler, shared_task<T> task)
|
||||
template<
|
||||
typename SCHEDULER,
|
||||
typename AWAITABLE,
|
||||
typename AWAIT_RESULT = detail::remove_rvalue_reference_t<typename awaitable_traits<AWAITABLE>::await_result_t>,
|
||||
std::enable_if_t<std::is_void_v<AWAIT_RESULT>, int> = 0>
|
||||
auto resume_on(SCHEDULER& scheduler, AWAITABLE awaitable)
|
||||
-> task<>
|
||||
{
|
||||
co_await task.when_ready();
|
||||
std::exception_ptr ex;
|
||||
try
|
||||
{
|
||||
co_await static_cast<AWAITABLE&&>(awaitable);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
// NOTE: We're assuming that `schedule()` operation is noexcept
|
||||
// here. If it were to throw what would we do if 'ex' was non-null?
|
||||
// Presumably we'd treat it the same as throwing an exception while
|
||||
// unwinding and call std::terminate()?
|
||||
|
||||
co_await scheduler.schedule();
|
||||
co_return co_await std::move(task);
|
||||
|
||||
if (ex)
|
||||
{
|
||||
std::rethrow_exception(ex);
|
||||
}
|
||||
}
|
||||
|
||||
template<typename SCHEDULER, typename T>
|
||||
|
|
|
@ -8,6 +8,9 @@
|
|||
#include <cppcoro/task.hpp>
|
||||
#include <cppcoro/shared_task.hpp>
|
||||
#include <cppcoro/async_generator.hpp>
|
||||
#include <cppcoro/awaitable_traits.hpp>
|
||||
|
||||
#include <cppcoro/detail/remove_rvalue_reference.hpp>
|
||||
|
||||
namespace cppcoro
|
||||
{
|
||||
|
@ -33,11 +36,12 @@ namespace cppcoro
|
|||
return schedule_on(transform.scheduler, std::forward<T>(value));
|
||||
}
|
||||
|
||||
template<typename T, typename SCHEDULER>
|
||||
task<T> schedule_on(SCHEDULER& scheduler, task<T> task)
|
||||
template<typename SCHEDULER, typename AWAITABLE>
|
||||
auto schedule_on(SCHEDULER& scheduler, AWAITABLE awaitable)
|
||||
-> task<detail::remove_rvalue_reference_t<typename awaitable_traits<AWAITABLE>::await_result_t>>
|
||||
{
|
||||
co_await scheduler.schedule();
|
||||
co_return co_await std::move(task);
|
||||
co_return co_await std::move(awaitable);
|
||||
}
|
||||
|
||||
template<typename T, typename SCHEDULER>
|
||||
|
|
|
@ -6,10 +6,12 @@
|
|||
#define CPPCORO_SHARED_LAZY_TASK_HPP_INCLUDED
|
||||
|
||||
#include <cppcoro/config.hpp>
|
||||
#include <cppcoro/awaitable_traits.hpp>
|
||||
#include <cppcoro/broken_promise.hpp>
|
||||
#include <cppcoro/task.hpp>
|
||||
|
||||
#include <cppcoro/detail/continuation.hpp>
|
||||
#include <cppcoro/detail/remove_rvalue_reference.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <exception>
|
||||
|
@ -541,36 +543,11 @@ namespace cppcoro
|
|||
}
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
shared_task<T> make_shared_task(task<T> t)
|
||||
template<typename AWAITABLE>
|
||||
auto make_shared_task(AWAITABLE awaitable)
|
||||
-> shared_task<detail::remove_rvalue_reference_t<typename awaitable_traits<AWAITABLE>::await_result_t>>
|
||||
{
|
||||
co_return co_await std::move(t);
|
||||
}
|
||||
|
||||
#if defined(_MSC_VER) && _MSC_FULL_VER <= 191025019 || CPPCORO_COMPILER_CLANG
|
||||
// HACK: Workaround for broken MSVC that doesn't execute <expr> in 'co_return <expr>;'.
|
||||
inline shared_task<void> make_shared_task(task<void> t)
|
||||
{
|
||||
co_await t;
|
||||
}
|
||||
#endif
|
||||
|
||||
// Note: We yield a task<> when applying fmap() operator to a shared_task<> since
|
||||
// it's not necessarily the case that because the source task was shared that the
|
||||
// result will be used in a shared context. So we choose to return a task<> which
|
||||
// generally has less overhead than a shared_task<>.
|
||||
|
||||
template<typename FUNC, typename T>
|
||||
task<std::result_of_t<FUNC&&(T&)>> fmap(FUNC func, shared_task<T> task)
|
||||
{
|
||||
co_return std::invoke(std::move(func), co_await std::move(task));
|
||||
}
|
||||
|
||||
template<typename FUNC>
|
||||
task<std::result_of_t<FUNC&&()>> fmap(FUNC func, shared_task<void> task)
|
||||
{
|
||||
co_await task;
|
||||
co_return std::invoke(std::move(func));
|
||||
co_return co_await static_cast<AWAITABLE&&>(awaitable);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,10 +6,11 @@
|
|||
#define CPPCORO_LAZY_TASK_HPP_INCLUDED
|
||||
|
||||
#include <cppcoro/config.hpp>
|
||||
#include <cppcoro/awaitable_traits.hpp>
|
||||
#include <cppcoro/broken_promise.hpp>
|
||||
#include <cppcoro/fmap.hpp>
|
||||
|
||||
#include <cppcoro/detail/continuation.hpp>
|
||||
#include <cppcoro/detail/remove_rvalue_reference.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <exception>
|
||||
|
@ -451,29 +452,11 @@ namespace cppcoro
|
|||
}
|
||||
}
|
||||
|
||||
// fmap() overloads for task<T>
|
||||
|
||||
template<typename FUNC, typename T>
|
||||
task<std::result_of_t<FUNC&&(T&&)>> fmap(FUNC func, task<T> t)
|
||||
template<typename AWAITABLE>
|
||||
auto make_task(AWAITABLE awaitable)
|
||||
-> task<detail::remove_rvalue_reference_t<typename awaitable_traits<AWAITABLE>::await_result_t>>
|
||||
{
|
||||
static_assert(
|
||||
!std::is_reference_v<FUNC>,
|
||||
"Passing by reference to task<T> coroutine is unsafe. "
|
||||
"Use std::ref or std::cref to explicitly pass by reference.");
|
||||
|
||||
co_return std::invoke(std::move(func), co_await std::move(t));
|
||||
}
|
||||
|
||||
template<typename FUNC>
|
||||
task<std::result_of_t<FUNC&&()>> fmap(FUNC func, task<> t)
|
||||
{
|
||||
static_assert(
|
||||
!std::is_reference_v<FUNC>,
|
||||
"Passing by reference to task<T> coroutine is unsafe. "
|
||||
"Use std::ref or std::cref to explicitly pass by reference.");
|
||||
|
||||
co_await t;
|
||||
co_return std::invoke(std::move(func));
|
||||
co_return co_await static_cast<AWAITABLE&&>(awaitable);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include <cppcoro/sync_wait.hpp>
|
||||
#include <cppcoro/when_all_ready.hpp>
|
||||
#include <cppcoro/single_consumer_event.hpp>
|
||||
#include <cppcoro/fmap.hpp>
|
||||
|
||||
#include "counted.hpp"
|
||||
|
||||
|
@ -203,7 +204,7 @@ TEST_CASE("shared_task<void> fmap operator")
|
|||
cppcoro::sync_wait(cppcoro::when_all_ready(
|
||||
[&]() -> cppcoro::task<>
|
||||
{
|
||||
cppcoro::task<std::string> numericStringTask =
|
||||
auto numericStringTask =
|
||||
setNumber()
|
||||
| cppcoro::fmap([&]() { return std::to_string(value); });
|
||||
|
||||
|
@ -231,7 +232,7 @@ TEST_CASE("shared_task<T> fmap operator")
|
|||
cppcoro::sync_wait(cppcoro::when_all_ready(
|
||||
[&]() -> cppcoro::task<>
|
||||
{
|
||||
cppcoro::task<std::string> numericStringTask =
|
||||
auto numericStringTask =
|
||||
getNumber()
|
||||
| cppcoro::fmap([](int x) { return std::to_string(x); });
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <cppcoro/single_consumer_event.hpp>
|
||||
#include <cppcoro/sync_wait.hpp>
|
||||
#include <cppcoro/when_all_ready.hpp>
|
||||
#include <cppcoro/fmap.hpp>
|
||||
|
||||
#include "counted.hpp"
|
||||
|
||||
|
@ -203,8 +204,6 @@ TEST_CASE("task<void> fmap pipe operator")
|
|||
|
||||
auto t = f() | fmap([] { return 123; });
|
||||
|
||||
CHECK(!t.is_ready());
|
||||
|
||||
cppcoro::sync_wait(cppcoro::when_all_ready(
|
||||
[&]() -> cppcoro::task<>
|
||||
{
|
||||
|
@ -215,8 +214,6 @@ TEST_CASE("task<void> fmap pipe operator")
|
|||
event.set();
|
||||
co_return;
|
||||
}()));
|
||||
|
||||
CHECK(t.is_ready());
|
||||
}
|
||||
|
||||
TEST_CASE("task<int> fmap pipe operator")
|
||||
|
@ -224,6 +221,7 @@ TEST_CASE("task<int> fmap pipe operator")
|
|||
using cppcoro::task;
|
||||
using cppcoro::fmap;
|
||||
using cppcoro::sync_wait;
|
||||
using cppcoro::make_task;
|
||||
|
||||
auto one = [&]() -> task<int>
|
||||
{
|
||||
|
@ -232,8 +230,8 @@ TEST_CASE("task<int> fmap pipe operator")
|
|||
|
||||
SUBCASE("r-value fmap / r-value lambda")
|
||||
{
|
||||
task<int> t = one() | fmap([delta = 1](auto i) { return i + delta; });
|
||||
CHECK(!t.is_ready());
|
||||
auto t = one()
|
||||
| fmap([delta = 1](auto i) { return i + delta; });
|
||||
CHECK(sync_wait(t) == 2);
|
||||
}
|
||||
|
||||
|
@ -241,20 +239,17 @@ TEST_CASE("task<int> fmap pipe operator")
|
|||
{
|
||||
using namespace std::string_literals;
|
||||
|
||||
task<std::string> t;
|
||||
|
||||
auto t = [&]
|
||||
{
|
||||
auto f = [prefix = "pfx"s](int x)
|
||||
{
|
||||
return prefix + std::to_string(x);
|
||||
};
|
||||
|
||||
// Want to make sure that the resulting task has taken
|
||||
// Want to make sure that the resulting awaitable has taken
|
||||
// a copy of the lambda passed to fmap().
|
||||
t = one() | fmap(f);
|
||||
}
|
||||
|
||||
CHECK(!t.is_ready());
|
||||
return one() | fmap(f);
|
||||
}();
|
||||
|
||||
CHECK(sync_wait(t) == "pfx1");
|
||||
}
|
||||
|
@ -263,20 +258,17 @@ TEST_CASE("task<int> fmap pipe operator")
|
|||
{
|
||||
using namespace std::string_literals;
|
||||
|
||||
task<std::string> t;
|
||||
|
||||
auto t = [&]
|
||||
{
|
||||
auto addprefix = fmap([prefix = "a really really long prefix that prevents small string optimisation"s](int x)
|
||||
{
|
||||
return prefix + std::to_string(x);
|
||||
});
|
||||
|
||||
// Want to make sure that the resulting task has taken
|
||||
// Want to make sure that the resulting awaitable has taken
|
||||
// a copy of the lambda passed to fmap().
|
||||
t = one() | addprefix;
|
||||
}
|
||||
|
||||
CHECK(!t.is_ready());
|
||||
return one() | addprefix;
|
||||
}();
|
||||
|
||||
CHECK(sync_wait(t) == "a really really long prefix that prevents small string optimisation1");
|
||||
}
|
||||
|
@ -297,7 +289,7 @@ TEST_CASE("task<int> fmap pipe operator")
|
|||
|
||||
// Want to make sure that the resulting task has taken
|
||||
// a copy of the lambda passed to fmap().
|
||||
t = one() | addprefix;
|
||||
t = make_task(one() | addprefix);
|
||||
}
|
||||
|
||||
CHECK(!t.is_ready());
|
||||
|
@ -331,8 +323,6 @@ TEST_CASE("chained fmap pipe operations")
|
|||
|
||||
auto t = asyncString("base"s) | prepend("pre_"s) | append("_post"s);
|
||||
|
||||
CHECK(!t.is_ready());
|
||||
|
||||
CHECK(sync_wait(t) == "pre_base_post");
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue