Refactor file_read/write_operation classes for Win32.

- Move common logic for Win32 OVERLAPPED-based operations out into
  new win32_overlapped_operation[_cancellable] base-classes.
  The concrete I/O operation classes are now greatly simplified
  due to the reduction of boilerplate.
- Added overloads for read() and write() functions that don't
  take a cancellation_token that return a simpler awaitable type
  that generates much simpler assembly.
This commit is contained in:
Lewis Baker 2017-12-27 19:36:14 +10:30
parent 0988627a84
commit 36be19a349
11 changed files with 642 additions and 554 deletions

View file

@ -14,6 +14,8 @@
#include <utility>
#include <cstdint>
struct _OVERLAPPED;
namespace cppcoro
{
namespace detail
@ -68,6 +70,29 @@ namespace cppcoro
win32::dword_t numberOfBytesTransferred,
win32::ulongptr_t completionKey);
io_state(callback_type* callback = nullptr) noexcept
: io_state(std::uint64_t(0), callback)
{}
io_state(void* pointer, callback_type* callback) noexcept
: m_callback(callback)
{
this->Internal = 0;
this->InternalHigh = 0;
this->Pointer = pointer;
this->hEvent = nullptr;
}
io_state(std::uint64_t offset, callback_type* callback) noexcept
: m_callback(callback)
{
this->Internal = 0;
this->InternalHigh = 0;
this->Offset = static_cast<dword_t>(offset);
this->OffsetHigh = static_cast<dword_t>(offset >> 32);
this->hEvent = nullptr;
}
callback_type* m_callback;
};

View file

@ -0,0 +1,374 @@
///////////////////////////////////////////////////////////////////////////////
// Copyright (c) Lewis Baker
// Licenced under MIT license. See LICENSE.txt for details.
///////////////////////////////////////////////////////////////////////////////
#ifndef CPPCORO_DETAIL_WIN32_OVERLAPPED_OPERATION_HPP_INCLUDED
#define CPPCORO_DETAIL_WIN32_OVERLAPPED_OPERATION_HPP_INCLUDED
#include <cppcoro/cancellation_registration.hpp>
#include <cppcoro/cancellation_token.hpp>
#include <cppcoro/operation_cancelled.hpp>
#include <cppcoro/detail/win32.hpp>
#include <optional>
#include <system_error>
#include <experimental/coroutine>
#include <cassert>
namespace cppcoro
{
namespace detail
{
class win32_overlapped_operation_base
: protected detail::win32::io_state
{
protected:
win32_overlapped_operation_base(
detail::win32::io_state::callback_type* callback) noexcept
: detail::win32::io_state(callback)
, m_errorCode(0)
, m_numberOfBytesTransferred(0)
{}
win32_overlapped_operation_base(
void* pointer,
detail::win32::io_state::callback_type* callback) noexcept
: detail::win32::io_state(pointer, callback)
, m_errorCode(0)
, m_numberOfBytesTransferred(0)
{}
win32_overlapped_operation_base(
std::uint64_t offset,
detail::win32::io_state::callback_type* callback) noexcept
: detail::win32::io_state(offset, callback)
, m_errorCode(0)
, m_numberOfBytesTransferred(0)
{}
_OVERLAPPED* get_overlapped() noexcept
{
return reinterpret_cast<_OVERLAPPED*>(
static_cast<detail::win32::overlapped*>(this));
}
std::size_t get_result()
{
if (m_errorCode != 0)
{
throw std::system_error{
static_cast<int>(m_errorCode),
std::system_category()
};
}
return m_numberOfBytesTransferred;
}
detail::win32::dword_t m_errorCode;
detail::win32::dword_t m_numberOfBytesTransferred;
};
template<typename OPERATION>
class win32_overlapped_operation
: protected win32_overlapped_operation_base
{
protected:
win32_overlapped_operation() noexcept
: win32_overlapped_operation_base(
&win32_overlapped_operation::on_operation_completed)
{}
win32_overlapped_operation(void* pointer) noexcept
: win32_overlapped_operation_base(
pointer,
&win32_overlapped_operation::on_operation_completed)
{}
win32_overlapped_operation(std::uint64_t offset) noexcept
: win32_overlapped_operation_base(
offset,
&win32_overlapped_operation::on_operation_completed)
{}
public:
bool await_ready() const noexcept { return false; }
bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine)
{
static_assert(std::is_base_of_v<win32_overlapped_operation, OPERATION>);
m_awaitingCoroutine = awaitingCoroutine;
return static_cast<OPERATION*>(this)->try_start();
}
decltype(auto) await_resume()
{
return static_cast<OPERATION*>(this)->get_result();
}
private:
static void on_operation_completed(
detail::win32::io_state* ioState,
detail::win32::dword_t errorCode,
detail::win32::dword_t numberOfBytesTransferred,
[[maybe_unused]] detail::win32::ulongptr_t completionKey) noexcept
{
auto* operation = static_cast<win32_overlapped_operation*>(ioState);
operation->m_errorCode = errorCode;
operation->m_numberOfBytesTransferred = numberOfBytesTransferred;
operation->m_awaitingCoroutine.resume();
}
std::experimental::coroutine_handle<> m_awaitingCoroutine;
};
template<typename OPERATION>
class win32_overlapped_operation_cancellable
: protected win32_overlapped_operation_base
{
// ERROR_OPERATION_ABORTED value from <Windows.h>
static constexpr detail::win32::dword_t error_operation_aborted = 995L;
protected:
win32_overlapped_operation_cancellable(cancellation_token&& ct) noexcept
: win32_overlapped_operation_base(&win32_overlapped_operation_cancellable::on_operation_completed)
, m_state(ct.is_cancellation_requested() ? state::completed : state::not_started)
, m_cancellationToken(std::move(ct))
{
m_errorCode = error_operation_aborted;
}
win32_overlapped_operation_cancellable(
void* pointer,
cancellation_token&& ct) noexcept
: win32_overlapped_operation_base(pointer, &win32_overlapped_operation_cancellable::on_operation_completed)
, m_state(ct.is_cancellation_requested() ? state::completed : state::not_started)
, m_cancellationToken(std::move(ct))
{
m_errorCode = error_operation_aborted;
}
win32_overlapped_operation_cancellable(
std::uint64_t offset,
cancellation_token&& ct) noexcept
: win32_overlapped_operation_base(offset, &win32_overlapped_operation_cancellable::on_operation_completed)
, m_state(ct.is_cancellation_requested() ? state::completed : state::not_started)
, m_cancellationToken(std::move(ct))
{
m_errorCode = error_operation_aborted;
}
win32_overlapped_operation_cancellable(
win32_overlapped_operation_cancellable&& other) noexcept
: win32_overlapped_operation_base(std::move(other))
, m_state(other.m_state.load(std::memory_order_relaxed))
, m_cancellationToken(std::move(other.m_cancellationToken))
, m_errorCode(other.m_errorCode)
, m_numberOfBytesTransferred(other.m_numberOfBytesTransferred)
{
}
public:
bool await_ready() const noexcept
{
return m_state.load(std::memory_order_relaxed) == state::completed;
}
bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine)
{
static_assert(std::is_base_of_v<win32_overlapped_operation_cancellable, OPERATION>);
m_awaitingCoroutine = awaitingCoroutine;
// TRICKY: Register cancellation callback before starting the operation
// in case the callback registration throws due to insufficient
// memory. We need to make sure that the logic that occurs after
// starting the operation is noexcept, otherwise we run into the
// problem of not being able to cancel the started operation and
// the dilemma of what to do with the exception.
//
// However, doing this means that the cancellation callback may run
// prior to returning below so in the case that cancellation may
// occur we defer setting the state to 'started' until after
// the operation has finished starting. The cancellation callback
// will only attempt to request cancellation of the operation with
// CancelIoEx() once the state has been set to 'started'.
const bool canBeCancelled = m_cancellationToken.can_be_cancelled();
if (canBeCancelled)
{
m_cancellationCallback.emplace(
std::move(m_cancellationToken),
[this] { this->on_cancellation_requested(); });
}
else
{
m_state.store(state::started, std::memory_order_relaxed);
}
// Now start the operation.
const bool willCompleteAsynchronously = static_cast<OPERATION*>(this)->try_start();
if (!willCompleteAsynchronously)
{
// Operation completed synchronously, resume awaiting coroutine immediately.
return false;
}
if (canBeCancelled)
{
// Need to flag that the operation has finished starting now.
// However, the operation may have completed concurrently on
// another thread, transitioning directly from not_started -> complete.
// Or it may have had the cancellation callback execute and transition
// from not_started -> cancellation_requested. We use a compare-exchange
// to determine a winner between these potential racing cases.
state oldState = state::not_started;
if (!m_state.compare_exchange_strong(
oldState,
state::started,
std::memory_order_release,
std::memory_order_acquire))
{
if (oldState == state::cancellation_requested)
{
// Request the operation be cancelled.
// Note that it may have already completed on a background
// thread by now so this request for cancellation may end up
// being ignored.
static_cast<OPERATION*>(this)->cancel();
if (!m_state.compare_exchange_strong(
oldState,
state::started,
std::memory_order_release,
std::memory_order_acquire))
{
assert(oldState == state::complete);
return false;
}
}
else
{
assert(oldState == state::complete);
return false;
}
}
}
return true;
}
decltype(auto) await_resume()
{
// Free memory used by the cancellation callback now that the operation
// has completed rather than waiting until the operation object destructs.
// eg. If the operation is passed to when_all() then the operation object
// may not be destructed until all of the operations complete.
m_cancellationCallback.reset();
if (m_errorCode == error_operation_aborted)
{
throw operation_cancelled{};
}
return static_cast<OPERATION*>(this)->get_result();
}
private:
enum class state
{
not_started,
started,
cancellation_requested,
completed
};
void on_cancellation_requested() noexcept
{
auto oldState = m_state.load(std::memory_order_acquire);
if (oldState == state::not_started)
{
// This callback is running concurrently with await_suspend().
// The call to start the operation may not have returned yet so
// we can't safely request cancellation of it. Instead we try to
// notify the await_suspend() thread by transitioning the state
// to state::cancellation_requested so that the await_suspend()
// thread can request cancellation after it has finished starting
// the operation.
const bool transferredCancelResponsibility =
m_state.compare_exchange_strong(
oldState,
state::cancellation_requested,
std::memory_order_release,
std::memory_order_acquire);
if (transferredCancelResponsibility)
{
return;
}
}
// No point requesting cancellation if the operation has already completed.
if (oldState != state::completed)
{
static_cast<OPERATION*>(this)->cancel();
}
}
static void on_operation_completed(
detail::win32::io_state* ioState,
detail::win32::dword_t errorCode,
detail::win32::dword_t numberOfBytesTransferred,
[[maybe_unused]] detail::win32::ulongptr_t completionKey) noexcept
{
auto* operation = static_cast<win32_overlapped_operation_cancellable*>(ioState);
operation->m_errorCode = errorCode;
operation->m_numberOfBytesTransferred = numberOfBytesTransferred;
auto state = operation->m_state.load(std::memory_order_acquire);
if (state == state::started)
{
operation->m_state.store(state::completed, std::memory_order_relaxed);
operation->m_awaitingCoroutine.resume();
}
else
{
// We are racing with await_suspend() call suspending.
// Try to mark it as completed using an atomic exchange and look
// at the previous value to determine whether the coroutine suspended
// first (in which case we resume it now) or we marked it as completed
// first (in which case await_suspend() will return false and immediately
// resume the coroutine).
state = operation->m_state.exchange(
state::completed,
std::memory_order_acq_rel);
if (state == state::started)
{
// The await_suspend() method returned (or will return) 'true' and so
// we need to resume the coroutine.
operation->m_awaitingCoroutine.resume();
}
}
}
std::atomic<state> m_state;
cppcoro::cancellation_token m_cancellationToken;
std::optional<cppcoro::cancellation_registration> m_cancellationCallback;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
};
}
}
#endif

View file

@ -15,76 +15,73 @@
#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
# include <cppcoro/detail/win32_overlapped_operation.hpp>
#endif
namespace cppcoro
{
class file_read_operation
#if CPPCORO_OS_WINNT
: private cppcoro::detail::win32::io_state
#endif
class file_read_operation
: public cppcoro::detail::win32_overlapped_operation<file_read_operation>
{
public:
#if CPPCORO_OS_WINNT
file_read_operation(
detail::win32::handle_t fileHandle,
std::uint64_t fileOffset,
void* buffer,
std::size_t byteCount,
cancellation_token cancellationToken) noexcept;
#endif
file_read_operation(file_read_operation&& other) noexcept;
file_read_operation(const file_read_operation& other) noexcept;
file_read_operation& operator=(const file_read_operation& other) = delete;
file_read_operation& operator=(file_read_operation&& other) = delete;
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaiter);
std::size_t await_resume();
std::size_t byteCount) noexcept
: cppcoro::detail::win32_overlapped_operation<file_read_operation>(fileOffset)
, m_fileHandle(fileHandle)
, m_buffer(buffer)
, m_byteCount(byteCount)
{}
private:
void on_cancellation_requested() noexcept;
friend class cppcoro::detail::win32_overlapped_operation<file_read_operation>;
#if CPPCORO_OS_WINNT
static void on_operation_completed(
detail::win32::io_state* ioState,
detail::win32::dword_t errorCode,
detail::win32::dword_t numberOfBytesTransferred,
detail::win32::ulongptr_t completionKey) noexcept;
#endif
bool try_start() noexcept;
enum class state
{
not_started,
started,
cancellation_requested,
complete
};
std::atomic<state> m_state;
#if CPPCORO_OS_WINNT
detail::win32::handle_t m_fileHandle;
#endif
void* m_buffer;
std::size_t m_byteCount;
cancellation_token m_cancellationToken;
std::experimental::coroutine_handle<> m_awaiter;
#if CPPCORO_OS_WINNT
detail::win32::dword_t m_errorCode;
detail::win32::dword_t m_numberOfBytesTransferred;
#endif
std::optional<cancellation_registration> m_cancellationRegistration;
};
class file_read_operation_cancellable
: public cppcoro::detail::win32_overlapped_operation_cancellable<file_read_operation_cancellable>
{
public:
file_read_operation_cancellable(
detail::win32::handle_t fileHandle,
std::uint64_t fileOffset,
void* buffer,
std::size_t byteCount,
cancellation_token&& cancellationToken) noexcept
: cppcoro::detail::win32_overlapped_operation_cancellable<file_read_operation_cancellable>(
fileOffset, std::move(cancellationToken))
, m_fileHandle(fileHandle)
, m_buffer(buffer)
, m_byteCount(byteCount)
{}
private:
friend class cppcoro::detail::win32_overlapped_operation_cancellable<file_read_operation_cancellable>;
bool try_start() noexcept;
void cancel() noexcept;
detail::win32::handle_t m_fileHandle;
void* m_buffer;
std::size_t m_byteCount;
};
#endif
}
#endif

View file

@ -15,76 +15,69 @@
#if CPPCORO_OS_WINNT
# include <cppcoro/detail/win32.hpp>
#endif
# include <cppcoro/detail/win32_overlapped_operation.hpp>
namespace cppcoro
{
class file_write_operation
#if CPPCORO_OS_WINNT
: private cppcoro::detail::win32::io_state
#endif
: public cppcoro::detail::win32_overlapped_operation<file_write_operation>
{
public:
#if CPPCORO_OS_WINNT
file_write_operation(
detail::win32::handle_t fileHandle,
std::uint64_t fileOffset,
const void* buffer,
std::size_t byteCount,
cancellation_token cancellationToken) noexcept;
#endif
file_write_operation(file_write_operation&& other) noexcept;
file_write_operation(const file_write_operation& other) noexcept;
file_write_operation& operator=(const file_write_operation& other) = delete;
file_write_operation& operator=(file_write_operation&& other) = delete;
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaiter);
std::size_t await_resume();
std::size_t byteCount) noexcept
: cppcoro::detail::win32_overlapped_operation<file_write_operation>(fileOffset)
, m_fileHandle(fileHandle)
, m_buffer(buffer)
, m_byteCount(byteCount)
{}
private:
void on_cancellation_requested() noexcept;
friend class cppcoro::detail::win32_overlapped_operation<file_write_operation>;
#if CPPCORO_OS_WINNT
static void on_operation_completed(
detail::win32::io_state* ioState,
detail::win32::dword_t errorCode,
detail::win32::dword_t numberOfBytesTransferred,
detail::win32::ulongptr_t completionKey) noexcept;
#endif
bool try_start() noexcept;
enum class state
{
not_started,
started,
cancellation_requested,
complete
};
std::atomic<state> m_state;
#if CPPCORO_OS_WINNT
detail::win32::handle_t m_fileHandle;
#endif
const void* m_buffer;
std::size_t m_byteCount;
cancellation_token m_cancellationToken;
std::experimental::coroutine_handle<> m_awaiter;
};
#if CPPCORO_OS_WINNT
detail::win32::dword_t m_errorCode;
detail::win32::dword_t m_numberOfBytesTransferred;
#endif
class file_write_operation_cancellable
: public cppcoro::detail::win32_overlapped_operation_cancellable<file_write_operation_cancellable>
{
public:
std::optional<cancellation_registration> m_cancellationRegistration;
file_write_operation_cancellable(
detail::win32::handle_t fileHandle,
std::uint64_t fileOffset,
const void* buffer,
std::size_t byteCount,
cancellation_token&& ct) noexcept
: cppcoro::detail::win32_overlapped_operation_cancellable<file_write_operation_cancellable>(fileOffset, std::move(ct))
, m_fileHandle(fileHandle)
, m_buffer(buffer)
, m_byteCount(byteCount)
{}
private:
friend class cppcoro::detail::win32_overlapped_operation<file_write_operation>;
bool try_start() noexcept;
void cancel() noexcept;
detail::win32::handle_t m_fileHandle;
const void* m_buffer;
std::size_t m_byteCount;
};
}
#endif // CPPCORO_OS_WINNT
#endif

View file

@ -45,10 +45,15 @@ namespace cppcoro
/// This object must be co_await'ed to start the read operation.
[[nodiscard]]
file_read_operation read(
std::uint64_t offset,
void* buffer,
std::size_t byteCount) const noexcept;
[[nodiscard]]
file_read_operation_cancellable read(
std::uint64_t offset,
void* buffer,
std::size_t byteCount,
cancellation_token ct = {}) const noexcept;
cancellation_token ct) const noexcept;
protected:

View file

@ -51,10 +51,15 @@ namespace cppcoro
/// This object must be co_await'ed to start the write operation.
[[nodiscard]]
file_write_operation write(
std::uint64_t offset,
const void* buffer,
std::size_t byteCount) noexcept;
[[nodiscard]]
file_write_operation_cancellable write(
std::uint64_t offset,
const void* buffer,
std::size_t byteCount,
cancellation_token ct = {}) noexcept;
cancellation_token ct) noexcept;
protected:

View file

@ -4,112 +4,13 @@
///////////////////////////////////////////////////////////////////////////////
#include <cppcoro/file_read_operation.hpp>
#include <cppcoro/operation_cancelled.hpp>
#include <system_error>
#include <utility>
#include <cassert>
#if CPPCORO_OS_WINNT
# define WIN32_LEAN_AND_MEAN
# include <Windows.h>
#endif
#if CPPCORO_OS_WINNT
cppcoro::file_read_operation::file_read_operation(
detail::win32::handle_t fileHandle,
std::uint64_t fileOffset,
void* buffer,
std::size_t byteCount,
cancellation_token cancellationToken) noexcept
: m_state(state::not_started)
, m_fileHandle(fileHandle)
, m_buffer(buffer)
, m_byteCount(byteCount)
, m_cancellationToken(std::move(cancellationToken))
, m_cancellationRegistration(std::nullopt)
bool cppcoro::file_read_operation::try_start() noexcept
{
this->Offset = static_cast<detail::win32::dword_t>(fileOffset);
this->OffsetHigh = static_cast<detail::win32::dword_t>(fileOffset >> 32);
if (m_cancellationToken.is_cancellation_requested())
{
m_state.store(state::complete, std::memory_order_relaxed);
m_numberOfBytesTransferred = 0;
m_errorCode = ERROR_OPERATION_ABORTED;
}
}
cppcoro::file_read_operation::file_read_operation(
file_read_operation&& other) noexcept
: m_state(state::not_started)
, m_fileHandle(other.m_fileHandle)
, m_buffer(other.m_buffer)
, m_byteCount(other.m_byteCount)
, m_cancellationToken(std::move(other.m_cancellationToken))
, m_cancellationRegistration(std::nullopt)
{
this->Offset = other.Offset;
this->OffsetHigh = other.OffsetHigh;
if (m_cancellationToken.is_cancellation_requested())
{
m_state.store(state::complete, std::memory_order_relaxed);
m_numberOfBytesTransferred = 0;
m_errorCode = ERROR_OPERATION_ABORTED;
}
}
cppcoro::file_read_operation::file_read_operation(
const file_read_operation& other) noexcept
: m_state(state::not_started)
, m_fileHandle(other.m_fileHandle)
, m_buffer(other.m_buffer)
, m_byteCount(other.m_byteCount)
, m_cancellationToken(other.m_cancellationToken)
, m_cancellationRegistration(std::nullopt)
{
this->Offset = other.Offset;
this->OffsetHigh = other.OffsetHigh;
if (m_cancellationToken.is_cancellation_requested())
{
m_state.store(state::complete, std::memory_order_relaxed);
m_numberOfBytesTransferred = 0;
m_errorCode = ERROR_OPERATION_ABORTED;
}
}
bool cppcoro::file_read_operation::await_ready() const noexcept
{
return m_state.load(std::memory_order_acquire) == state::complete;
}
bool cppcoro::file_read_operation::await_suspend(
std::experimental::coroutine_handle<> awaiter)
{
m_awaiter = awaiter;
this->hEvent = nullptr;
this->m_callback = &file_read_operation::on_operation_completed;
const bool enableCancellation = m_cancellationToken.can_be_cancelled();
if (enableCancellation)
{
// Registering a cancellation callback can throw std::bad_alloc in
// low-memory situations so we want to do this before we start the
// I/O operation below as we need it to be noexcept after the
// operation has started.
m_cancellationRegistration.emplace(
std::move(m_cancellationToken),
[this] { this->on_cancellation_requested(); });
}
else
{
m_state.store(state::started, std::memory_order_relaxed);
}
const DWORD numberOfBytesToRead =
m_byteCount <= 0xFFFFFFFF ?
static_cast<DWORD>(m_byteCount) : DWORD(0xFFFFFFFF);
@ -119,8 +20,7 @@ bool cppcoro::file_read_operation::await_suspend(
m_buffer,
numberOfBytesToRead,
nullptr,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)));
get_overlapped());
const DWORD errorCode = ok ? ERROR_SUCCESS : ::GetLastError();
if (errorCode != ERROR_IO_PENDING)
{
@ -135,8 +35,7 @@ bool cppcoro::file_read_operation::await_suspend(
ok = ::GetOverlappedResult(
m_fileHandle,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)),
get_overlapped(),
&m_numberOfBytesTransferred,
FALSE);
if (!ok)
@ -144,147 +43,55 @@ bool cppcoro::file_read_operation::await_suspend(
m_numberOfBytesTransferred = 0;
}
m_state.store(state::complete, std::memory_order_relaxed);
return false;
}
if (enableCancellation)
{
// Now that the I/O operation has been started we need to update
// the state to reflect that. However, we now have the potential
// for two other threads to be concurrently trying to update the
// state: a thread requesting cancellation that is trying to
// transition not_started -> cancellation_requested, and an I/O
// thread that is handling a completion event that is trying to
// transition not_started -> complete.
//
// We need to use a compare-exchange operation to determine the
// winner of these potential racing cases.
// See also on_cancellation_requested() and on_operation_completed().
state oldState = state::not_started;
bool succeeded = m_state.compare_exchange_strong(
oldState,
state::started,
std::memory_order_release,
std::memory_order_acquire);
if (!succeeded)
{
if (oldState == state::cancellation_requested)
{
// Another thread requested cancellation and has executed the
// on_cancellation_requested() callback concurrently and has
// handed off responsibility for cancelling the operation to us.
::CancelIoEx(
m_fileHandle,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)));
// We might still be racing with an I/O thread that is processing
// completion of this operation concurrently so we need to use
// another compare-exchange operation to decide who wins.
succeeded = m_state.compare_exchange_strong(
oldState,
state::started,
std::memory_order_release,
std::memory_order_acquire);
if (succeeded)
{
return true;
}
}
assert(oldState == state::complete);
// The operation completed concurrently on another thread before we could
// transition to the 'started' state so we'll return as if we completed
// synchronously.
return false;
}
}
return true;
}
std::size_t cppcoro::file_read_operation::await_resume()
bool cppcoro::file_read_operation_cancellable::try_start() noexcept
{
if (m_errorCode != ERROR_SUCCESS)
const DWORD numberOfBytesToRead =
m_byteCount <= 0xFFFFFFFF ?
static_cast<DWORD>(m_byteCount) : DWORD(0xFFFFFFFF);
BOOL ok = ::ReadFile(
m_fileHandle,
m_buffer,
numberOfBytesToRead,
nullptr,
get_overlapped());
const DWORD errorCode = ok ? ERROR_SUCCESS : ::GetLastError();
if (errorCode != ERROR_IO_PENDING)
{
if (m_errorCode == ERROR_OPERATION_ABORTED)
{
throw operation_cancelled{};
}
// Completed synchronously.
//
// We are assuming that the file-handle has been set to the
// mode where synchronous completions do not post a completion
// event to the I/O completion port and thus can return without
// suspending here.
throw std::system_error
{
static_cast<int>(m_errorCode),
std::system_category(),
"file read error"
};
}
this->m_errorCode = errorCode;
return m_numberOfBytesTransferred;
}
void cppcoro::file_read_operation::on_cancellation_requested() noexcept
{
auto oldState = m_state.load(std::memory_order_acquire);
if (oldState == state::not_started)
{
// This callback is running concurrently with await_suspend().
// The call to ReadFile() may not have returned yet so we can't call
// CancelIoEx().
// Try to notify the await_suspend() thread by transitioning to
// state::cancellation_requested so that it can call CancelIoEx()
// once ReadFile() returns.
const bool transferredCancelResponsibility =
m_state.compare_exchange_strong(
oldState,
state::cancellation_requested,
std::memory_order_release,
std::memory_order_acquire);
if (transferredCancelResponsibility)
{
return;
}
}
if (oldState != state::complete)
{
::CancelIoEx(
ok = ::GetOverlappedResult(
m_fileHandle,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)));
get_overlapped(),
&m_numberOfBytesTransferred,
FALSE);
if (!ok)
{
m_numberOfBytesTransferred = 0;
}
return false;
}
return true;
}
void cppcoro::file_read_operation::on_operation_completed(
detail::win32::io_state* ioState,
detail::win32::dword_t errorCode,
detail::win32::dword_t numberOfBytesTransferred,
[[maybe_unused]] detail::win32::ulongptr_t completionKey) noexcept
void cppcoro::file_read_operation_cancellable::cancel() noexcept
{
auto* operation = static_cast<file_read_operation*>(ioState);
operation->m_errorCode = errorCode;
operation->m_numberOfBytesTransferred = numberOfBytesTransferred;
auto state = operation->m_state.load(std::memory_order_acquire);
if (state == state::started)
{
operation->m_state.store(state::complete, std::memory_order_release);
operation->m_awaiter.resume();
}
else
{
state = operation->m_state.exchange(
state::complete,
std::memory_order_acq_rel);
if (state == state::started)
{
operation->m_awaiter.resume();
}
}
(void)::CancelIoEx(m_fileHandle, this->get_overlapped());
}
#endif // CPPCORO_OS_WINNT

View file

@ -4,112 +4,13 @@
///////////////////////////////////////////////////////////////////////////////
#include <cppcoro/file_write_operation.hpp>
#include <cppcoro/operation_cancelled.hpp>
#include <system_error>
#include <utility>
#include <cassert>
#if CPPCORO_OS_WINNT
# define WIN32_LEAN_AND_MEAN
# include <Windows.h>
#endif
#if CPPCORO_OS_WINNT
cppcoro::file_write_operation::file_write_operation(
detail::win32::handle_t fileHandle,
std::uint64_t fileOffset,
const void* buffer,
std::size_t byteCount,
cancellation_token cancellationToken) noexcept
: m_state(state::not_started)
, m_fileHandle(fileHandle)
, m_buffer(buffer)
, m_byteCount(byteCount)
, m_cancellationToken(std::move(cancellationToken))
, m_cancellationRegistration(std::nullopt)
bool cppcoro::file_write_operation::try_start() noexcept
{
this->Offset = static_cast<detail::win32::dword_t>(fileOffset);
this->OffsetHigh = static_cast<detail::win32::dword_t>(fileOffset >> 32);
if (m_cancellationToken.is_cancellation_requested())
{
m_state.store(state::complete, std::memory_order_relaxed);
m_numberOfBytesTransferred = 0;
m_errorCode = ERROR_OPERATION_ABORTED;
}
}
cppcoro::file_write_operation::file_write_operation(
file_write_operation&& other) noexcept
: m_state(state::not_started)
, m_fileHandle(other.m_fileHandle)
, m_buffer(other.m_buffer)
, m_byteCount(other.m_byteCount)
, m_cancellationToken(std::move(other.m_cancellationToken))
, m_cancellationRegistration(std::nullopt)
{
this->Offset = other.Offset;
this->OffsetHigh = other.OffsetHigh;
if (m_cancellationToken.is_cancellation_requested())
{
m_state.store(state::complete, std::memory_order_relaxed);
m_numberOfBytesTransferred = 0;
m_errorCode = ERROR_OPERATION_ABORTED;
}
}
cppcoro::file_write_operation::file_write_operation(
const file_write_operation& other) noexcept
: m_state(state::not_started)
, m_fileHandle(other.m_fileHandle)
, m_buffer(other.m_buffer)
, m_byteCount(other.m_byteCount)
, m_cancellationToken(other.m_cancellationToken)
, m_cancellationRegistration(std::nullopt)
{
this->Offset = other.Offset;
this->OffsetHigh = other.OffsetHigh;
if (m_cancellationToken.is_cancellation_requested())
{
m_state.store(state::complete, std::memory_order_relaxed);
m_numberOfBytesTransferred = 0;
m_errorCode = ERROR_OPERATION_ABORTED;
}
}
bool cppcoro::file_write_operation::await_ready() const noexcept
{
return m_state.load(std::memory_order_acquire) == state::complete;
}
bool cppcoro::file_write_operation::await_suspend(
std::experimental::coroutine_handle<> awaiter)
{
m_awaiter = awaiter;
this->hEvent = nullptr;
this->m_callback = &file_write_operation::on_operation_completed;
const bool enableCancellation = m_cancellationToken.can_be_cancelled();
if (enableCancellation)
{
// Registering a cancellation callback can throw std::bad_alloc in
// low-memory situations so we want to do this before we start the
// I/O operation below as we need it to be noexcept after the
// operation has started.
m_cancellationRegistration.emplace(
std::move(m_cancellationToken),
[this] { this->on_cancellation_requested(); });
}
else
{
m_state.store(state::started, std::memory_order_relaxed);
}
const DWORD numberOfBytesToWrite =
m_byteCount <= 0xFFFFFFFF ?
static_cast<DWORD>(m_byteCount) : DWORD(0xFFFFFFFF);
@ -144,147 +45,57 @@ bool cppcoro::file_write_operation::await_suspend(
m_numberOfBytesTransferred = 0;
}
m_state.store(state::complete, std::memory_order_relaxed);
return false;
}
if (enableCancellation)
{
// Now that the I/O operation has been started we need to update
// the state to reflect that. However, we now have the potential
// for two other threads to be concurrently trying to update the
// state: a thread requesting cancellation that is trying to
// transition not_started -> cancellation_requested, and an I/O
// thread that is handling a completion event that is trying to
// transition not_started -> complete.
//
// We need to use a compare-exchange operation to determine the
// winner of these potential racing cases.
// See also on_cancellation_requested() and on_operation_completed().
state oldState = state::not_started;
bool succeeded = m_state.compare_exchange_strong(
oldState,
state::started,
std::memory_order_release,
std::memory_order_acquire);
if (!succeeded)
{
if (oldState == state::cancellation_requested)
{
// Another thread requested cancellation and has executed the
// on_cancellation_requested() callback concurrently and has
// handed off responsibility for cancelling the operation to us.
::CancelIoEx(
m_fileHandle,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)));
// We might still be racing with an I/O thread that is processing
// completion of this operation concurrently so we need to use
// another compare-exchange operation to decide who wins.
succeeded = m_state.compare_exchange_strong(
oldState,
state::started,
std::memory_order_release,
std::memory_order_acquire);
if (succeeded)
{
return true;
}
}
assert(oldState == state::complete);
// The operation completed concurrently on another thread before we could
// transition to the 'started' state so we'll return as if we completed
// synchronously.
return false;
}
}
return true;
}
std::size_t cppcoro::file_write_operation::await_resume()
bool cppcoro::file_write_operation_cancellable::try_start() noexcept
{
if (m_errorCode != ERROR_SUCCESS)
const DWORD numberOfBytesToWrite =
m_byteCount <= 0xFFFFFFFF ?
static_cast<DWORD>(m_byteCount) : DWORD(0xFFFFFFFF);
BOOL ok = ::WriteFile(
m_fileHandle,
m_buffer,
numberOfBytesToWrite,
nullptr,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)));
const DWORD errorCode = ok ? ERROR_SUCCESS : ::GetLastError();
if (errorCode != ERROR_IO_PENDING)
{
if (m_errorCode == ERROR_OPERATION_ABORTED)
{
throw operation_cancelled{};
}
// Completed synchronously.
//
// We are assuming that the file-handle has been set to the
// mode where synchronous completions do not post a completion
// event to the I/O completion port and thus can return without
// suspending here.
throw std::system_error
{
static_cast<int>(m_errorCode),
std::system_category(),
"file write error"
};
}
m_errorCode = errorCode;
return m_numberOfBytesTransferred;
}
void cppcoro::file_write_operation::on_cancellation_requested() noexcept
{
auto oldState = m_state.load(std::memory_order_acquire);
if (oldState == state::not_started)
{
// This callback is running concurrently with await_suspend().
// The call to WriteFile() may not have returned yet so we can't call
// CancelIoEx().
// Try to notify the await_suspend() thread by transitioning to
// state::cancellation_requested so that it can call CancelIoEx()
// once WriteFile() returns.
const bool transferredCancelResponsibility =
m_state.compare_exchange_strong(
oldState,
state::cancellation_requested,
std::memory_order_release,
std::memory_order_acquire);
if (transferredCancelResponsibility)
{
return;
}
}
if (oldState != state::complete)
{
::CancelIoEx(
ok = ::GetOverlappedResult(
m_fileHandle,
reinterpret_cast<LPOVERLAPPED>(
static_cast<detail::win32::io_state*>(this)));
static_cast<detail::win32::io_state*>(this)),
&m_numberOfBytesTransferred,
FALSE);
if (!ok)
{
m_numberOfBytesTransferred = 0;
}
return false;
}
return true;
}
void cppcoro::file_write_operation::on_operation_completed(
detail::win32::io_state* ioState,
detail::win32::dword_t errorCode,
detail::win32::dword_t numberOfBytesTransferred,
[[maybe_unused]] detail::win32::ulongptr_t completionKey) noexcept
void cppcoro::file_write_operation_cancellable::cancel() noexcept
{
auto* operation = static_cast<file_write_operation*>(ioState);
operation->m_errorCode = errorCode;
operation->m_numberOfBytesTransferred = numberOfBytesTransferred;
auto state = operation->m_state.load(std::memory_order_acquire);
if (state == state::started)
{
operation->m_state.store(state::complete, std::memory_order_release);
operation->m_awaiter.resume();
}
else
{
state = operation->m_state.exchange(
state::complete,
std::memory_order_acq_rel);
if (state == state::started)
{
operation->m_awaiter.resume();
}
}
(void)::CancelIoEx(m_fileHandle, get_overlapped());
}
#endif // CPPCORO_OS_WINNT

View file

@ -8,12 +8,24 @@
#if CPPCORO_OS_WINNT
cppcoro::file_read_operation cppcoro::readable_file::read(
std::uint64_t offset,
void* buffer,
std::size_t byteCount) const noexcept
{
return file_read_operation(
m_fileHandle.handle(),
offset,
buffer,
byteCount);
}
cppcoro::file_read_operation_cancellable cppcoro::readable_file::read(
std::uint64_t offset,
void* buffer,
std::size_t byteCount,
cancellation_token ct) const noexcept
{
return file_read_operation(
return file_read_operation_cancellable(
m_fileHandle.handle(),
offset,
buffer,

View file

@ -43,12 +43,25 @@ void cppcoro::writable_file::set_size(
}
cppcoro::file_write_operation cppcoro::writable_file::write(
std::uint64_t offset,
const void* buffer,
std::size_t byteCount) noexcept
{
return file_write_operation{
m_fileHandle.handle(),
offset,
buffer,
byteCount
};
}
cppcoro::file_write_operation_cancellable cppcoro::writable_file::write(
std::uint64_t offset,
const void* buffer,
std::size_t byteCount,
cancellation_token ct) noexcept
{
return file_write_operation{
return file_write_operation_cancellable{
m_fileHandle.handle(),
offset,
buffer,

View file

@ -9,6 +9,8 @@
#include <cppcoro/read_write_file.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/when_all.hpp>
#include <cppcoro/cancellation_source.hpp>
#include <cppcoro/on_scope_exit.hpp>
#include <random>
@ -138,7 +140,7 @@ TEST_CASE_FIXTURE(temp_dir_fixture, "read write file")
co_await f.read(0, buffer2, 50);
CHECK(std::memcmp(buffer1, buffer2, 50) == 0);
co_await f.read(50, buffer2, 50);
CHECK(std::memcmp(buffer1 + 50, buffer2, 50) == 0);
};
@ -146,4 +148,48 @@ TEST_CASE_FIXTURE(temp_dir_fixture, "read write file")
cppcoro::sync_wait(run());
}
TEST_CASE_FIXTURE(temp_dir_fixture, "cancel read")
{
cppcoro::sync_wait([&]() -> cppcoro::task<>
{
cppcoro::io_work_scope ioScope{ io_service() };
auto f = cppcoro::read_write_file::open(io_service(), temp_dir() / "foo.txt");
f.set_size(20 * 1024 * 1024);
cppcoro::cancellation_source canceller;
try
{
(void)co_await cppcoro::when_all(
[&]() -> cppcoro::task<int>
{
const auto fileSize = f.size();
const std::size_t bufferSize = 64 * 1024;
auto buffer = std::make_unique<std::uint8_t[]>(bufferSize);
std::uint64_t offset = 0;
while (offset < fileSize)
{
auto bytesRead = co_await f.read(offset, buffer.get(), bufferSize, canceller.token());
offset += bytesRead;
}
WARN("should have been cancelled");
co_return 0;
}(),
[&]() -> cppcoro::task<int>
{
using namespace std::chrono_literals;
co_await io_service().schedule_after(1ms);
canceller.request_cancellation();
co_return 0;
}());
WARN("Expected exception to be thrown");
}
catch (const cppcoro::operation_cancelled&)
{
}
}());
}
TEST_SUITE_END();