From 42509cf3a72b79cb9109debf96dba47a9aa08c98 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Fri, 22 May 2026 21:05:33 +0200 Subject: [PATCH 1/2] feat(native): complete async-op shadowing across all I/O types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend the shadow pattern (concrete-class method names hidden by derived native_* templates that return strongly-typed awaitables instead of dispatching virtually) so every async operation on every public I/O type has a native_* counterpart. Before this branch, only a subset of the async surface was shadowed. native_tcp_socket and native_tcp_acceptor shadowed their read/write/accept/connect ops, but native_udp_socket covered only send_to/recv_from, and the local-socket and file types had no shadow at all. Code holding a native_X reference therefore went through the virtual io_object dispatch for any non-shadowed call, defeating the point of using the typed wrapper. This commit closes the gap: - native_udp_socket gains connect, send, recv shadows. - New native_local_stream_socket shadows read_some, write_some, connect. - New native_local_stream_acceptor shadows accept(peer&) and the move-accept overload (returning a native_local_stream_socket). - New native_local_datagram_socket shadows send_to, recv_from, connect, send, recv. - New native_stream_file shadows read_some, write_some. - New native_random_access_file shadows read_some_at, write_some_at. - Every shadow type also gets wait() returning a typed native_wait_awaitable. Tests under test/unit/native/ exercise each new shadow with a static_assert that pins the shadowing contract (the native op must return a type distinct from the concrete base op), plus runtime checks of the awaitable path and a polymorphic-slice test that verifies the base class still works via virtual dispatch when the object is used through its non-native interface. Supporting bits: - backend.hpp gains the file-type tag typedefs (stream_file_type, random_access_file_type, and *_service_type) for every backend so create_handle(ctx) compiles in the new native_*_file wrappers. - stream_file and random_access_file grow protected constructors so the native virtual-base initialization works (io_stream virtually inherits io_object; only the most-derived class initializes it). - local_stream_acceptor::bind now honors bind_option::unlink_existing on Windows via DeleteFileA — the option was previously a no-op in the non-POSIX branch, which broke testUnlinkExisting on the iocp variant. - win_tcp_service::connect_ex / accept_ex getters move into the class body so TUs that include only the service header (the new native_local_stream_socket test among them) get the inline definitions. - win_local_stream_service.hpp pulls in the acceptor header directly so its inline shutdown() sees the full win_local_stream_acceptor_internal type regardless of consumer include order. - A portable test/unit/local_temp.hpp helper replaces the old POSIX-only mkdtemp/unlink pattern in the local-socket tests, using std::filesystem + a random_device-seeded RNG so parallel ctest processes don't collide on /tmp paths. Known platform gaps documented in-file: - local_datagram_socket tests stay POSIX-only at the top-of-file. Windows has never shipped AF_UNIX SOCK_DGRAM; the very first WSASocket(AF_UNIX, SOCK_DGRAM, ...) returns WSAESOCKTNOSUPPORT. - local_stream_socket's three socketpair-based tests (testReadWrite, testSocketPair, testAvailable) and the raw-fd testRelease remain per-test POSIX-gated; make_local_stream_pair is gated POSIX-only in the public header because socketpair() doesn't exist on Windows. TCP socket tests already exercise the equivalent read/write paths on Windows IOCP. --- include/boost/corosio/backend.hpp | 32 ++ .../detail/iocp/win_local_stream_service.hpp | 1 + .../detail/iocp/win_tcp_acceptor_service.hpp | 12 - .../native/detail/iocp/win_tcp_service.hpp | 10 +- include/boost/corosio/native/native.hpp | 6 + .../native/native_local_datagram_socket.hpp | 489 ++++++++++++++++++ .../native/native_local_stream_acceptor.hpp | 287 ++++++++++ .../native/native_local_stream_socket.hpp | 332 ++++++++++++ .../native/native_random_access_file.hpp | 228 ++++++++ .../corosio/native/native_stream_file.hpp | 214 ++++++++ .../corosio/native/native_tcp_acceptor.hpp | 48 ++ .../corosio/native/native_tcp_socket.hpp | 48 ++ .../corosio/native/native_udp_socket.hpp | 251 ++++++++- include/boost/corosio/random_access_file.hpp | 4 + include/boost/corosio/stream_file.hpp | 7 + .../boost/corosio/test/local_socket_pair.hpp | 142 +++++ .../corosio/local_socket_latency_bench.cpp | 42 +- .../corosio/local_socket_throughput_bench.cpp | 28 +- src/corosio/src/local_stream_acceptor.cpp | 18 +- test/unit/local_datagram_socket.cpp | 48 +- test/unit/local_stream_socket.cpp | 60 +-- test/unit/local_temp.hpp | 68 +++ .../native/native_local_datagram_socket.cpp | 313 +++++++++++ .../native/native_local_stream_socket.cpp | 359 +++++++++++++ .../unit/native/native_random_access_file.cpp | 205 ++++++++ test/unit/native/native_resolver.cpp | 23 + test/unit/native/native_signal_set.cpp | 8 + test/unit/native/native_stream_file.cpp | 199 +++++++ test/unit/native/native_tcp_acceptor.cpp | 64 +++ test/unit/native/native_tcp_socket.cpp | 68 +++ test/unit/native/native_timer.cpp | 8 + test/unit/native/native_udp_socket.cpp | 171 ++++++ 32 files changed, 3684 insertions(+), 109 deletions(-) create mode 100644 include/boost/corosio/native/native_local_datagram_socket.hpp create mode 100644 include/boost/corosio/native/native_local_stream_acceptor.hpp create mode 100644 include/boost/corosio/native/native_local_stream_socket.hpp create mode 100644 include/boost/corosio/native/native_random_access_file.hpp create mode 100644 include/boost/corosio/native/native_stream_file.hpp create mode 100644 include/boost/corosio/test/local_socket_pair.hpp create mode 100644 test/unit/local_temp.hpp create mode 100644 test/unit/native/native_local_datagram_socket.cpp create mode 100644 test/unit/native/native_local_stream_socket.cpp create mode 100644 test/unit/native/native_random_access_file.cpp create mode 100644 test/unit/native/native_stream_file.cpp diff --git a/include/boost/corosio/backend.hpp b/include/boost/corosio/backend.hpp index e1d52ee09..ea14c1a91 100644 --- a/include/boost/corosio/backend.hpp +++ b/include/boost/corosio/backend.hpp @@ -45,6 +45,10 @@ class posix_signal; class posix_signal_service; class posix_resolver; class posix_resolver_service; +class posix_stream_file; +class posix_stream_file_service; +class posix_random_access_file; +class posix_random_access_file_service; } // namespace detail @@ -71,6 +75,11 @@ struct epoll_t using resolver_type = detail::posix_resolver; using resolver_service_type = detail::posix_resolver_service; + using stream_file_type = detail::posix_stream_file; + using stream_file_service_type = detail::posix_stream_file_service; + using random_access_file_type = detail::posix_random_access_file; + using random_access_file_service_type = detail::posix_random_access_file_service; + /// Create the scheduler and services for this backend. BOOST_COROSIO_DECL static detail::scheduler& construct(capy::execution_context&, unsigned concurrency_hint); @@ -103,6 +112,10 @@ class posix_signal; class posix_signal_service; class posix_resolver; class posix_resolver_service; +class posix_stream_file; +class posix_stream_file_service; +class posix_random_access_file; +class posix_random_access_file_service; } // namespace detail @@ -129,6 +142,11 @@ struct select_t using resolver_type = detail::posix_resolver; using resolver_service_type = detail::posix_resolver_service; + using stream_file_type = detail::posix_stream_file; + using stream_file_service_type = detail::posix_stream_file_service; + using random_access_file_type = detail::posix_random_access_file; + using random_access_file_service_type = detail::posix_random_access_file_service; + /// Create the scheduler and services for this backend. BOOST_COROSIO_DECL static detail::scheduler& construct(capy::execution_context&, unsigned concurrency_hint); @@ -161,6 +179,10 @@ class posix_signal; class posix_signal_service; class posix_resolver; class posix_resolver_service; +class posix_stream_file; +class posix_stream_file_service; +class posix_random_access_file; +class posix_random_access_file_service; } // namespace detail @@ -187,6 +209,11 @@ struct kqueue_t using resolver_type = detail::posix_resolver; using resolver_service_type = detail::posix_resolver_service; + using stream_file_type = detail::posix_stream_file; + using stream_file_service_type = detail::posix_stream_file_service; + using random_access_file_type = detail::posix_random_access_file; + using random_access_file_service_type = detail::posix_random_access_file_service; + /// Create the scheduler and services for this backend. BOOST_COROSIO_DECL static detail::scheduler& construct(capy::execution_context&, unsigned concurrency_hint); @@ -260,6 +287,11 @@ struct iocp_t using resolver_type = detail::win_resolver; using resolver_service_type = detail::win_resolver_service; + using stream_file_type = detail::win_stream_file; + using stream_file_service_type = detail::win_file_service; + using random_access_file_type = detail::win_random_access_file; + using random_access_file_service_type = detail::win_random_access_file_service; + /** Create the scheduler and services for this backend. @param ctx The execution context that owns the scheduler. diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp index c4a32b06d..9cc628981 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp index ff76ea075..3bb98852a 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp @@ -1101,18 +1101,6 @@ win_tcp_service::native_handle() const noexcept return iocp_; } -inline LPFN_CONNECTEX -win_tcp_service::connect_ex() const noexcept -{ - return connect_ex_; -} - -inline LPFN_ACCEPTEX -win_tcp_service::accept_ex() const noexcept -{ - return accept_ex_; -} - inline void win_tcp_service::post(overlapped_op* op) { diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp index 15237606c..38e818503 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp @@ -152,10 +152,16 @@ class BOOST_COROSIO_DECL win_tcp_service final void* native_handle() const noexcept; /** Return the ConnectEx function pointer. */ - LPFN_CONNECTEX connect_ex() const noexcept; + LPFN_CONNECTEX connect_ex() const noexcept + { + return connect_ex_; + } /** Return the AcceptEx function pointer. */ - LPFN_ACCEPTEX accept_ex() const noexcept; + LPFN_ACCEPTEX accept_ex() const noexcept + { + return accept_ex_; + } /** Post an overlapped operation for completion. */ void post(overlapped_op* op); diff --git a/include/boost/corosio/native/native.hpp b/include/boost/corosio/native/native.hpp index 5c8b3aeae..a5b277e3c 100644 --- a/include/boost/corosio/native/native.hpp +++ b/include/boost/corosio/native/native.hpp @@ -19,10 +19,16 @@ #include #include +#include +#include +#include +#include #include #include +#include #include #include #include +#include #endif diff --git a/include/boost/corosio/native/native_local_datagram_socket.hpp b/include/boost/corosio/native/native_local_datagram_socket.hpp new file mode 100644 index 000000000..e603d6161 --- /dev/null +++ b/include/boost/corosio/native/native_local_datagram_socket.hpp @@ -0,0 +1,489 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_DATAGRAM_SOCKET_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_DATAGRAM_SOCKET_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL +#include +#endif + +#if BOOST_COROSIO_HAS_SELECT +#include +#endif + +#if BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** An asynchronous Unix datagram socket with devirtualized I/O. + + This class template inherits from @ref local_datagram_socket + and shadows the async operations (`send_to`, `recv_from`, + `connect`, `send`, `recv`) with versions that call the backend + implementation directly, allowing the compiler to inline + through the entire call chain. + + Non-async operations (`open`, `close`, `cancel`, `bind`, + socket options) remain unchanged and dispatch through the + compiled library. + + A `native_local_datagram_socket` IS-A `local_datagram_socket` + and can be passed to any function expecting + `local_datagram_socket&`, in which case virtual dispatch is + used transparently. + + @tparam Backend A backend tag value (e.g., `epoll`) whose type + provides the concrete implementation types. + + @par Thread Safety + Same as @ref local_datagram_socket. + + @par Example + @code + #include + + native_io_context ctx; + native_local_datagram_socket s(ctx); + s.open(); + s.bind(local_endpoint("/tmp/recv.sock")); + char buf[1024]; + local_endpoint sender; + auto [ec, n] = co_await s.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), sender); + @endcode + + @see local_datagram_socket, epoll_t, iocp_t +*/ +template +class native_local_datagram_socket : public local_datagram_socket +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::local_datagram_socket_type; + using service_type = typename backend_type::local_datagram_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_send_to_awaitable + { + native_local_datagram_socket& self_; + ConstBufferSequence buffers_; + corosio::local_endpoint dest_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_send_to_awaitable( + native_local_datagram_socket& self, + ConstBufferSequence buffers, + corosio::local_endpoint dest, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , dest_(dest) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().send_to( + h, env->executor, buffers_, dest_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_recv_from_awaitable + { + native_local_datagram_socket& self_; + MutableBufferSequence buffers_; + corosio::local_endpoint& source_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_recv_from_awaitable( + native_local_datagram_socket& self, + MutableBufferSequence buffers, + corosio::local_endpoint& source, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , source_(source) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().recv_from( + h, env->executor, buffers_, &source_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + struct native_wait_awaitable + { + native_local_datagram_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable( + native_local_datagram_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_connect_awaitable + { + native_local_datagram_socket& self_; + corosio::local_endpoint endpoint_; + std::stop_token token_; + mutable std::error_code ec_; + + native_connect_awaitable( + native_local_datagram_socket& self, + corosio::local_endpoint ep) noexcept + : self_(self) + , endpoint_(ep) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().connect( + h, env->executor, endpoint_, token_, &ec_); + } + }; + + template + struct native_send_awaitable + { + native_local_datagram_socket& self_; + ConstBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_send_awaitable( + native_local_datagram_socket& self, + ConstBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().send( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_recv_awaitable + { + native_local_datagram_socket& self_; + MutableBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_recv_awaitable( + native_local_datagram_socket& self, + MutableBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().recv( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + +public: + /** Construct a native socket from an execution context. + + @param ctx The execution context that will own this socket. + */ + explicit native_local_datagram_socket(capy::execution_context& ctx) + : local_datagram_socket(create_handle(ctx)) + { + } + + /** Construct a native socket from an executor. + + @param ex The executor whose context will own the socket. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_local_datagram_socket>) && + capy::Executor + explicit native_local_datagram_socket(Ex const& ex) + : native_local_datagram_socket(ex.context()) + { + } + + /// Move construct. + native_local_datagram_socket(native_local_datagram_socket&&) noexcept = + default; + + /// Move assign. + native_local_datagram_socket& + operator=(native_local_datagram_socket&&) noexcept = default; + + native_local_datagram_socket(native_local_datagram_socket const&) = delete; + native_local_datagram_socket& + operator=(native_local_datagram_socket const&) = delete; + + /** Send a datagram to the specified destination. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::send_to. + */ + template + auto send_to( + CB const& buffers, + corosio::local_endpoint dest, + corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("send_to: socket not open"); + return native_send_to_awaitable( + *this, buffers, dest, static_cast(flags)); + } + + /// @overload + template + auto send_to(CB const& buffers, corosio::local_endpoint dest) + { + return send_to(buffers, dest, corosio::message_flags::none); + } + + /** Receive a datagram and capture the sender's endpoint. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::recv_from. + */ + template + auto recv_from( + MB const& buffers, + corosio::local_endpoint& source, + corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("recv_from: socket not open"); + return native_recv_from_awaitable( + *this, buffers, source, static_cast(flags)); + } + + /// @overload + template + auto recv_from(MB const& buffers, corosio::local_endpoint& source) + { + return recv_from(buffers, source, corosio::message_flags::none); + } + + /** Asynchronously connect to set the default peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::connect. + + If the socket is not already open, it is opened automatically. + */ + auto connect(corosio::local_endpoint ep) + { + if (!is_open()) + open(); + return native_connect_awaitable(*this, ep); + } + + /** Send a datagram to the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::send. + */ + template + auto send(CB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("send: socket not open"); + return native_send_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto send(CB const& buffers) + { + return send(buffers, corosio::message_flags::none); + } + + /** Receive a datagram from the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::recv. + */ + template + auto recv(MB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("recv: socket not open"); + return native_recv_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto recv(MB const& buffers) + { + return recv(buffers, corosio::message_flags::none); + } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_DATAGRAM_SOCKET_HPP diff --git a/include/boost/corosio/native/native_local_stream_acceptor.hpp b/include/boost/corosio/native/native_local_stream_acceptor.hpp new file mode 100644 index 000000000..963ba3780 --- /dev/null +++ b/include/boost/corosio/native/native_local_stream_acceptor.hpp @@ -0,0 +1,287 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_ACCEPTOR_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_ACCEPTOR_HPP + +#include +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL +#include +#endif + +#if BOOST_COROSIO_HAS_SELECT +#include +#endif + +#if BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** An asynchronous Unix stream acceptor with devirtualized accept. + + This class template inherits from @ref local_stream_acceptor + and shadows both `accept` overloads (the peer-reference form + and the move-return form) with versions that call the backend + implementation directly, allowing the compiler to inline + through the entire call chain. The move-return form yields a + @ref native_local_stream_socket so subsequent I/O on the peer + is also devirtualized. + + Non-async operations (`listen`, `close`, `cancel`) remain + unchanged and dispatch through the compiled library. + + A `native_local_stream_acceptor` IS-A `local_stream_acceptor` + and can be passed to any function expecting + `local_stream_acceptor&`. + + @tparam Backend A backend tag value (e.g., `epoll`). + + @par Thread Safety + Same as @ref local_stream_acceptor. + + @see local_stream_acceptor, epoll_t, iocp_t +*/ +template +class native_local_stream_acceptor : public local_stream_acceptor +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::local_stream_acceptor_type; + using service_type = + typename backend_type::local_stream_acceptor_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + struct native_wait_awaitable + { + native_local_stream_acceptor& acc_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable( + native_local_stream_acceptor& acc, wait_type w) noexcept + : acc_(acc) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_accept_awaitable + { + native_local_stream_acceptor& acc_; + local_stream_socket& peer_; + std::stop_token token_; + mutable std::error_code ec_; + mutable io_object::implementation* peer_impl_ = nullptr; + + native_accept_awaitable( + native_local_stream_acceptor& acc, + local_stream_socket& peer) noexcept + : acc_(acc) + , peer_(peer) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + if (!ec_) + acc_.reset_peer_impl(peer_, peer_impl_); + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().accept( + h, env->executor, token_, &ec_, &peer_impl_); + } + }; + + struct native_move_accept_awaitable + { + native_local_stream_acceptor& acc_; + std::stop_token token_; + mutable std::error_code ec_; + mutable io_object::implementation* peer_impl_ = nullptr; + + explicit native_move_accept_awaitable( + native_local_stream_acceptor& acc) noexcept + : acc_(acc) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result> + await_resume() const noexcept + { + if (token_.stop_requested()) + return { + make_error_code(std::errc::operation_canceled), + native_local_stream_socket(acc_.context())}; + if (ec_ || !peer_impl_) + return { + ec_, + native_local_stream_socket(acc_.context())}; + + native_local_stream_socket peer(acc_.context()); + acc_.reset_peer_impl(peer, peer_impl_); + return {ec_, std::move(peer)}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().accept( + h, env->executor, token_, &ec_, &peer_impl_); + } + }; + +public: + /** Construct a native acceptor from an execution context. + + @param ctx The execution context that will own this acceptor. + */ + explicit native_local_stream_acceptor(capy::execution_context& ctx) + : local_stream_acceptor(create_handle(ctx), ctx) + { + } + + /** Construct a native acceptor from an executor. + + @param ex The executor whose context will own the acceptor. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_local_stream_acceptor>) && + capy::Executor + explicit native_local_stream_acceptor(Ex const& ex) + : native_local_stream_acceptor(ex.context()) + { + } + + /// Move construct. + native_local_stream_acceptor(native_local_stream_acceptor&&) noexcept = + default; + + /// Move assign. + native_local_stream_acceptor& + operator=(native_local_stream_acceptor&&) noexcept = default; + + native_local_stream_acceptor(native_local_stream_acceptor const&) = delete; + native_local_stream_acceptor& + operator=(native_local_stream_acceptor const&) = delete; + + /** Asynchronously accept an incoming connection. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_acceptor::accept. + + @param peer The socket to receive the accepted connection. + + @return An awaitable yielding `io_result<>`. + + @throws std::logic_error if the acceptor is not listening. + + Both this acceptor and @p peer must outlive the returned + awaitable. + */ + auto accept(local_stream_socket& peer) + { + if (!is_open()) + detail::throw_logic_error("accept: acceptor not listening"); + return native_accept_awaitable(*this, peer); + } + + /** Asynchronously accept an incoming connection, returning the peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. The accepted peer is returned as a + @ref native_local_stream_socket so that subsequent I/O on it + is also devirtualized. + + @return An awaitable yielding + `io_result>`. + + @throws std::logic_error if the acceptor is not listening. + + This acceptor must outlive the returned awaitable. + */ + auto accept() + { + if (!is_open()) + detail::throw_logic_error("accept: acceptor not listening"); + return native_move_accept_awaitable(*this); + } + + /** Asynchronously wait for the acceptor to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_acceptor::wait. + + @param w The wait direction (typically `wait_type::read`). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_ACCEPTOR_HPP diff --git a/include/boost/corosio/native/native_local_stream_socket.hpp b/include/boost/corosio/native/native_local_stream_socket.hpp new file mode 100644 index 000000000..9bf2eeaef --- /dev/null +++ b/include/boost/corosio/native/native_local_stream_socket.hpp @@ -0,0 +1,332 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL +#include +#endif + +#if BOOST_COROSIO_HAS_SELECT +#include +#endif + +#if BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** An asynchronous Unix stream socket with devirtualized I/O operations. + + This class template inherits from @ref local_stream_socket and + shadows the async operations (`read_some`, `write_some`, + `connect`) with versions that call the backend implementation + directly, allowing the compiler to inline through the entire + call chain. + + Non-async operations (`open`, `close`, `cancel`, socket options) + remain unchanged and dispatch through the compiled library. + + A `native_local_stream_socket` IS-A `local_stream_socket` and + can be passed to any function expecting `local_stream_socket&` + or `io_stream&`, in which case virtual dispatch is used + transparently. + + @tparam Backend A backend tag value (e.g., `epoll`) whose type + provides the concrete implementation types. + + @par Thread Safety + Same as @ref local_stream_socket. + + @par Example + @code + #include + + native_io_context ctx; + native_local_stream_socket s(ctx); + s.open(); + auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock")); + @endcode + + @see local_stream_socket, epoll_t, iocp_t +*/ +template +class native_local_stream_socket : public local_stream_socket +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::local_stream_socket_type; + using service_type = typename backend_type::local_stream_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_read_awaitable + { + native_local_stream_socket& self_; + MutableBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_read_awaitable( + native_local_stream_socket& self, + MutableBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().read_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_write_awaitable + { + native_local_stream_socket& self_; + ConstBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_write_awaitable( + native_local_stream_socket& self, + ConstBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().write_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + + struct native_wait_awaitable + { + native_local_stream_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable( + native_local_stream_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_connect_awaitable + { + native_local_stream_socket& self_; + corosio::local_endpoint endpoint_; + std::stop_token token_; + mutable std::error_code ec_; + + native_connect_awaitable( + native_local_stream_socket& self, + corosio::local_endpoint ep) noexcept + : self_(self) + , endpoint_(ep) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().connect( + h, env->executor, endpoint_, token_, &ec_); + } + }; + +public: + /** Construct a native socket from an execution context. + + @param ctx The execution context that will own this socket. + */ + explicit native_local_stream_socket(capy::execution_context& ctx) + : io_object(create_handle(ctx)) + { + } + + /** Construct a native socket from an executor. + + @param ex The executor whose context will own the socket. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_local_stream_socket>) && + capy::Executor + explicit native_local_stream_socket(Ex const& ex) + : native_local_stream_socket(ex.context()) + { + } + + /// Move construct. + native_local_stream_socket(native_local_stream_socket&&) noexcept = default; + + /// Move assign. + native_local_stream_socket& + operator=(native_local_stream_socket&&) noexcept = default; + + native_local_stream_socket(native_local_stream_socket const&) = delete; + native_local_stream_socket& + operator=(native_local_stream_socket const&) = delete; + + /** Asynchronously read data from the socket. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::read_some. + + @param buffers The buffer sequence to read into. + + @return An awaitable yielding `(error_code, std::size_t)`. + */ + template + auto read_some(MB const& buffers) + { + return native_read_awaitable(*this, buffers); + } + + /** Asynchronously write data to the socket. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::write_some. + + @param buffers The buffer sequence to write from. + + @return An awaitable yielding `(error_code, std::size_t)`. + */ + template + auto write_some(CB const& buffers) + { + return native_write_awaitable(*this, buffers); + } + + /** Asynchronously connect to a remote endpoint. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_socket::connect. + + If the socket is not already open, it is opened automatically. + + @param ep The local endpoint (path) to connect to. + + @return An awaitable yielding `io_result<>`. + + @throws std::system_error if the socket needs to be opened + and the open fails. + */ + auto connect(corosio::local_endpoint ep) + { + if (!is_open()) + open(); + return native_connect_awaitable(*this, ep); + } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP diff --git a/include/boost/corosio/native/native_random_access_file.hpp b/include/boost/corosio/native/native_random_access_file.hpp new file mode 100644 index 000000000..33387dc05 --- /dev/null +++ b/include/boost/corosio/native/native_random_access_file.hpp @@ -0,0 +1,228 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_RANDOM_ACCESS_FILE_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_RANDOM_ACCESS_FILE_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_SELECT || \ + BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** A random-access file with devirtualized async I/O operations. + + This class template inherits from @ref random_access_file and + shadows `read_some_at` / `write_some_at` with versions that + call the backend implementation directly, allowing the compiler + to inline through the entire call chain. + + Non-async operations (`open`, `close`, `size`, `resize`, + `sync_data`, `sync_all`) remain unchanged and dispatch through + the compiled library. + + A `native_random_access_file` IS-A `random_access_file` and + can be passed to any function expecting `random_access_file&`, + in which case virtual dispatch is used transparently. + + @note On POSIX platforms, file I/O is dispatched to a thread + pool regardless of the chosen reactor backend, so all three + reactor tags (`epoll`, `select`, `kqueue`) resolve to the same + underlying implementation. The `Backend` template parameter + exists for API symmetry with @ref native_tcp_socket and friends. + The vtable savings are smaller relative to the thread-pool / + overlapped-I/O cost than they are for socket operations. + + @tparam Backend A backend tag value (e.g., `epoll`, `iocp`). + + @par Thread Safety + Same as @ref random_access_file. + + @par Example + @code + #include + + native_io_context ctx; + native_random_access_file f(ctx); + f.open("data.bin", file_base::read_only); + char buf[4096]; + auto [ec, n] = co_await f.read_some_at( + 0, capy::mutable_buffer(buf, sizeof(buf))); + @endcode + + @see random_access_file, epoll_t, iocp_t +*/ +template +class native_random_access_file : public random_access_file +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::random_access_file_type; + using service_type = + typename backend_type::random_access_file_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_read_at_awaitable + { + native_random_access_file& self_; + std::uint64_t offset_; + MutableBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_read_at_awaitable( + native_random_access_file& self, + std::uint64_t offset, + MutableBufferSequence buffers) noexcept + : self_(self) + , offset_(offset) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().read_some_at( + offset_, h, env->executor, buffers_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_write_at_awaitable + { + native_random_access_file& self_; + std::uint64_t offset_; + ConstBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_write_at_awaitable( + native_random_access_file& self, + std::uint64_t offset, + ConstBufferSequence buffers) noexcept + : self_(self) + , offset_(offset) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().write_some_at( + offset_, h, env->executor, buffers_, + token_, &ec_, &bytes_transferred_); + } + }; + +public: + /** Construct a native random-access file from an execution context. + + @param ctx The execution context that will own this file. + */ + explicit native_random_access_file(capy::execution_context& ctx) + : random_access_file(create_handle(ctx)) + { + } + + /** Construct a native random-access file from an executor. + + @param ex The executor whose context will own this file. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_random_access_file>) && + capy::Executor + explicit native_random_access_file(Ex const& ex) + : native_random_access_file(ex.context()) + { + } + + /// Move construct. + native_random_access_file(native_random_access_file&&) noexcept = default; + + /// Move assign. + native_random_access_file& + operator=(native_random_access_file&&) noexcept = default; + + native_random_access_file(native_random_access_file const&) = delete; + native_random_access_file& + operator=(native_random_access_file const&) = delete; + + /** Asynchronously read at the given offset. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref random_access_file::read_some_at. + */ + template + auto read_some_at(std::uint64_t offset, MB const& buffers) + { + return native_read_at_awaitable(*this, offset, buffers); + } + + /** Asynchronously write at the given offset. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref random_access_file::write_some_at. + */ + template + auto write_some_at(std::uint64_t offset, CB const& buffers) + { + return native_write_at_awaitable(*this, offset, buffers); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_RANDOM_ACCESS_FILE_HPP diff --git a/include/boost/corosio/native/native_stream_file.hpp b/include/boost/corosio/native/native_stream_file.hpp new file mode 100644 index 000000000..ed1b15e18 --- /dev/null +++ b/include/boost/corosio/native/native_stream_file.hpp @@ -0,0 +1,214 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_STREAM_FILE_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_STREAM_FILE_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_SELECT || \ + BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** A sequential file with devirtualized async I/O operations. + + This class template inherits from @ref stream_file and shadows + `read_some` / `write_some` with versions that call the backend + implementation directly, allowing the compiler to inline through + the entire call chain. + + Non-async operations (`open`, `close`, `size`, `resize`, `seek`, + `sync_data`, `sync_all`) remain unchanged and dispatch through + the compiled library. + + A `native_stream_file` IS-A `stream_file` and can be passed to + any function expecting `stream_file&` or `io_stream&`, in which + case virtual dispatch is used transparently. + + @note On POSIX platforms, file I/O is dispatched to a thread + pool regardless of the chosen reactor backend, so all three + reactor tags (`epoll`, `select`, `kqueue`) resolve to the same + underlying implementation. The `Backend` template parameter + exists for API symmetry with @ref native_tcp_socket and friends. + The vtable savings are smaller relative to the thread-pool / + overlapped-I/O cost than they are for socket operations. + + @tparam Backend A backend tag value (e.g., `epoll`, `iocp`). + + @par Thread Safety + Same as @ref stream_file. + + @par Example + @code + #include + + native_io_context ctx; + native_stream_file f(ctx); + f.open("data.bin", file_base::read_only); + char buf[4096]; + auto [ec, n] = co_await f.read_some( + capy::mutable_buffer(buf, sizeof(buf))); + @endcode + + @see stream_file, epoll_t, iocp_t +*/ +template +class native_stream_file : public stream_file +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::stream_file_type; + using service_type = typename backend_type::stream_file_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_read_awaitable + { + native_stream_file& self_; + MutableBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_read_awaitable( + native_stream_file& self, + MutableBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().read_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_write_awaitable + { + native_stream_file& self_; + ConstBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_write_awaitable( + native_stream_file& self, + ConstBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().write_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + +public: + /** Construct a native stream file from an execution context. + + @param ctx The execution context that will own this file. + */ + explicit native_stream_file(capy::execution_context& ctx) + : io_object(create_handle(ctx)) + { + } + + /** Construct a native stream file from an executor. + + @param ex The executor whose context will own this file. + */ + template + requires(!std::same_as, native_stream_file>) && + capy::Executor + explicit native_stream_file(Ex const& ex) : native_stream_file(ex.context()) + { + } + + /// Move construct. + native_stream_file(native_stream_file&&) noexcept = default; + + /// Move assign. + native_stream_file& operator=(native_stream_file&&) noexcept = default; + + native_stream_file(native_stream_file const&) = delete; + native_stream_file& operator=(native_stream_file const&) = delete; + + /** Asynchronously read data from the file. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::read_some. + */ + template + auto read_some(MB const& buffers) + { + return native_read_awaitable(*this, buffers); + } + + /** Asynchronously write data to the file. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::write_some. + */ + template + auto write_some(CB const& buffers) + { + return native_write_awaitable(*this, buffers); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_STREAM_FILE_HPP diff --git a/include/boost/corosio/native/native_tcp_acceptor.hpp b/include/boost/corosio/native/native_tcp_acceptor.hpp index 7546d56f8..75835852b 100644 --- a/include/boost/corosio/native/native_tcp_acceptor.hpp +++ b/include/boost/corosio/native/native_tcp_acceptor.hpp @@ -65,6 +65,40 @@ class native_tcp_acceptor : public tcp_acceptor return *static_cast(h_.get()); } + struct native_wait_awaitable + { + native_tcp_acceptor& acc_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable(native_tcp_acceptor& acc, wait_type w) noexcept + : acc_(acc) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + struct native_accept_awaitable { native_tcp_acceptor& acc_; @@ -169,6 +203,20 @@ class native_tcp_acceptor : public tcp_acceptor detail::throw_logic_error("accept: acceptor not listening"); return native_accept_awaitable(*this, peer); } + + /** Asynchronously wait for the acceptor to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref tcp_acceptor::wait. + + @param w The wait direction (typically `wait_type::read`). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } }; } // namespace boost::corosio diff --git a/include/boost/corosio/native/native_tcp_socket.hpp b/include/boost/corosio/native/native_tcp_socket.hpp index 03e1e2bf7..94686e996 100644 --- a/include/boost/corosio/native/native_tcp_socket.hpp +++ b/include/boost/corosio/native/native_tcp_socket.hpp @@ -153,6 +153,40 @@ class native_tcp_socket : public tcp_socket } }; + struct native_wait_awaitable + { + native_tcp_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable(native_tcp_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + struct native_connect_awaitable { native_tcp_socket& self_; @@ -293,6 +327,20 @@ class native_tcp_socket : public tcp_socket detail::throw_logic_error("connect: socket not open"); return native_connect_awaitable(*this, ep); } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref tcp_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } }; } // namespace boost::corosio diff --git a/include/boost/corosio/native/native_udp_socket.hpp b/include/boost/corosio/native/native_udp_socket.hpp index 805b463f2..c7148892c 100644 --- a/include/boost/corosio/native/native_udp_socket.hpp +++ b/include/boost/corosio/native/native_udp_socket.hpp @@ -36,9 +36,10 @@ namespace boost::corosio { /** An asynchronous UDP socket with devirtualized I/O operations. This class template inherits from @ref udp_socket and shadows - the async operations (`send_to`, `recv_from`) with versions - that call the backend implementation directly, allowing the - compiler to inline through the entire call chain. + the async operations (`send_to`, `recv_from`, `connect`, `send`, + `recv`) with versions that call the backend implementation + directly, allowing the compiler to inline through the entire + call chain. Non-async operations (`open`, `close`, `cancel`, `bind`, socket options) remain unchanged and dispatch through the @@ -172,6 +173,158 @@ class native_udp_socket : public udp_socket } }; + struct native_wait_awaitable + { + native_udp_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable(native_udp_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_connect_awaitable + { + native_udp_socket& self_; + endpoint endpoint_; + std::stop_token token_; + mutable std::error_code ec_; + + native_connect_awaitable(native_udp_socket& self, endpoint ep) noexcept + : self_(self) + , endpoint_(ep) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().connect( + h, env->executor, endpoint_, token_, &ec_); + } + }; + + template + struct native_send_awaitable + { + native_udp_socket& self_; + ConstBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_send_awaitable( + native_udp_socket& self, + ConstBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().send( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_recv_awaitable + { + native_udp_socket& self_; + MutableBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_recv_awaitable( + native_udp_socket& self, + MutableBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().recv( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + public: /** Construct a native UDP socket from an execution context. @@ -262,6 +415,98 @@ class native_udp_socket : public udp_socket { return recv_from(buffers, source, corosio::message_flags::none); } + + /** Asynchronously connect to set the default peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::connect. + + If the socket is not already open, it is opened automatically + using the address family of @p ep. + + @param ep The remote endpoint to connect to. + + @return An awaitable yielding `io_result<>`. + + @throws std::system_error if the socket needs to be opened + and the open fails. + */ + auto connect(endpoint ep) + { + if (!is_open()) + open(ep.is_v6() ? udp::v6() : udp::v4()); + return native_connect_awaitable(*this, ep); + } + + /** Send a datagram to the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::send. + + @param buffers The buffer sequence containing data to send. + @param flags Message flags. + + @return An awaitable yielding `(error_code, std::size_t)`. + + @throws std::logic_error if the socket is not open. + */ + template + auto send(CB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("send: socket not open"); + return native_send_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto send(CB const& buffers) + { + return send(buffers, corosio::message_flags::none); + } + + /** Receive a datagram from the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::recv. + + @param buffers The buffer sequence to receive data into. + @param flags Message flags (e.g. message_flags::peek). + + @return An awaitable yielding `(error_code, std::size_t)`. + + @throws std::logic_error if the socket is not open. + */ + template + auto recv(MB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("recv: socket not open"); + return native_recv_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto recv(MB const& buffers) + { + return recv(buffers, corosio::message_flags::none); + } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } }; } // namespace boost::corosio diff --git a/include/boost/corosio/random_access_file.hpp b/include/boost/corosio/random_access_file.hpp index cd7d6ea7a..8e77a2b9d 100644 --- a/include/boost/corosio/random_access_file.hpp +++ b/include/boost/corosio/random_access_file.hpp @@ -361,6 +361,10 @@ class BOOST_COROSIO_DECL random_access_file : public io_object */ void assign(native_handle_type handle); +protected: + /// Construct from a pre-built handle (for native_random_access_file). + explicit random_access_file(handle h) noexcept : io_object(std::move(h)) {} + private: inline implementation& get() const noexcept { diff --git a/include/boost/corosio/stream_file.hpp b/include/boost/corosio/stream_file.hpp index 89f88784a..292908ae2 100644 --- a/include/boost/corosio/stream_file.hpp +++ b/include/boost/corosio/stream_file.hpp @@ -252,6 +252,13 @@ class BOOST_COROSIO_DECL stream_file : public io_stream seek(std::int64_t offset, file_base::seek_basis origin = file_base::seek_set); +protected: + /// Default-construct (for derived types that initialize io_object directly). + stream_file() noexcept = default; + + /// Construct from a pre-built handle (for native_stream_file). + explicit stream_file(handle h) noexcept : io_object(std::move(h)) {} + private: inline implementation& get() const noexcept { diff --git a/include/boost/corosio/test/local_socket_pair.hpp b/include/boost/corosio/test/local_socket_pair.hpp new file mode 100644 index 000000000..9793009c7 --- /dev/null +++ b/include/boost/corosio/test/local_socket_pair.hpp @@ -0,0 +1,142 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_TEST_LOCAL_SOCKET_PAIR_HPP +#define BOOST_COROSIO_TEST_LOCAL_SOCKET_PAIR_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost::corosio::test { + +/** Create a connected pair of AF_UNIX stream sockets via bind+accept+connect. + + Unlike the library-side @ref make_local_stream_pair (POSIX-only, + socketpair-based), this helper drives the public acceptor API so + it can produce native template wrappers like + `native_local_stream_socket` — the path benchmarks need + to exercise the shadowed read_some/write_some/connect ops. + + @tparam Socket Concrete or native local stream socket type. + @tparam Acceptor Matching acceptor type. + + @param ctx I/O context backing both sockets. + + @return Connected pair `{accepted, connected}`. +*/ +template< + class Socket = local_stream_socket, + class Acceptor = local_stream_acceptor> +std::pair +make_local_stream_pair(io_context& ctx) +{ + namespace fs = std::filesystem; + + static std::random_device rd; + static std::mt19937_64 gen{rd()}; + + std::string path; + for (int attempt = 0; attempt < 16; ++attempt) + { + std::string name = "co_pair_"; + name += std::to_string(gen()); + auto candidate = fs::temp_directory_path() / name; + std::error_code ec; + if (fs::create_directory(candidate, ec)) + { + path = (candidate / "s").string(); + break; + } + } + if (path.empty()) + throw std::runtime_error("make_local_stream_pair: temp path failed"); + + auto ex = ctx.get_executor(); + + Acceptor acc(ctx); + acc.open(); + if (auto ec = acc.bind(local_endpoint(path))) + throw std::runtime_error( + "local_stream_pair bind failed: " + ec.message()); + if (auto ec = acc.listen()) + throw std::runtime_error( + "local_stream_pair listen failed: " + ec.message()); + + Socket s1(ctx); + Socket s2(ctx); + s2.open(); + + std::error_code accept_ec, connect_ec; + bool accept_done = false, connect_done = false; + + capy::run_async(ex)( + [](Acceptor& a, Socket& s, std::error_code& ec_out, + bool& done_out) -> capy::task<> { + auto [ec] = co_await a.accept(s); + ec_out = ec; + done_out = true; + }(acc, s1, accept_ec, accept_done)); + + capy::run_async(ex)( + [](Socket& s, local_endpoint ep, std::error_code& ec_out, + bool& done_out) -> capy::task<> { + auto [ec] = co_await s.connect(ep); + ec_out = ec; + done_out = true; + }(s2, local_endpoint(path), connect_ec, connect_done)); + + ctx.run(); + ctx.restart(); + + // The bind path on disk is no longer needed once accept/connect + // have rendezvoused; remove the file and its parent directory so + // repeated bench invocations don't accumulate cruft under /tmp. + std::error_code rm_ec; + fs::remove(fs::path(path), rm_ec); + fs::remove(fs::path(path).parent_path(), rm_ec); + + if (!accept_done || accept_ec) + { + std::fprintf( + stderr, "local_stream_pair: accept failed (done=%d, ec=%s)\n", + accept_done, accept_ec.message().c_str()); + acc.close(); + throw std::runtime_error("local_stream_pair accept failed"); + } + + if (!connect_done || connect_ec) + { + std::fprintf( + stderr, "local_stream_pair: connect failed (done=%d, ec=%s)\n", + connect_done, connect_ec.message().c_str()); + acc.close(); + s1.close(); + throw std::runtime_error("local_stream_pair connect failed"); + } + + acc.close(); + + return {std::move(s1), std::move(s2)}; +} + +} // namespace boost::corosio::test + +#endif diff --git a/perf/bench/corosio/local_socket_latency_bench.cpp b/perf/bench/corosio/local_socket_latency_bench.cpp index 8b548ddd9..c8d4a6594 100644 --- a/perf/bench/corosio/local_socket_latency_bench.cpp +++ b/perf/bench/corosio/local_socket_latency_bench.cpp @@ -15,7 +15,9 @@ #include #include -#include +#include +#include +#include #include #include #include @@ -36,8 +38,8 @@ namespace { template capy::task<> unix_pingpong_client_task( - corosio::local_stream_socket& client, - corosio::local_stream_socket& server, + corosio::native_local_stream_socket& client, + corosio::native_local_stream_socket& server, std::size_t message_size, bench::state& state) { @@ -76,11 +78,15 @@ template void bench_unix_pingpong_latency(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto message_size = static_cast(state.range(0)); state.counters["message_size"] = static_cast(message_size); corosio::native_io_context ioc; - auto [client, server] = corosio::make_local_stream_pair(ioc); + auto [client, server] = + corosio::test::make_local_stream_pair(ioc); capy::run_async(ioc.get_executor())( unix_pingpong_client_task(client, server, message_size, state)); @@ -104,20 +110,25 @@ template void bench_unix_concurrent_latency(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + int num_pairs = static_cast(state.range(0)); state.counters["num_pairs"] = num_pairs; corosio::native_io_context ioc; - std::vector clients; - std::vector servers; + std::vector clients; + std::vector servers; clients.reserve(num_pairs); servers.reserve(num_pairs); for (int i = 0; i < num_pairs; ++i) { - auto [c, s] = corosio::make_local_stream_pair(ioc); + auto [c, s] = + corosio::test::make_local_stream_pair( + ioc); clients.push_back(std::move(c)); servers.push_back(std::move(s)); } @@ -150,13 +161,17 @@ template void bench_unix_pingpong_latency_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto message_size = static_cast(state.range(0)); state.counters["message_size"] = static_cast(message_size); corosio::io_context_options opts; opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - auto [client, server] = corosio::make_local_stream_pair(ioc); + auto [client, server] = + corosio::test::make_local_stream_pair(ioc); capy::run_async(ioc.get_executor())( unix_pingpong_client_task(client, server, message_size, state)); @@ -180,6 +195,9 @@ template void bench_unix_concurrent_latency_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + int num_pairs = static_cast(state.range(0)); state.counters["num_pairs"] = num_pairs; @@ -187,15 +205,17 @@ bench_unix_concurrent_latency_lockless(bench::state& state) opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - std::vector clients; - std::vector servers; + std::vector clients; + std::vector servers; clients.reserve(num_pairs); servers.reserve(num_pairs); for (int i = 0; i < num_pairs; ++i) { - auto [c, s] = corosio::make_local_stream_pair(ioc); + auto [c, s] = + corosio::test::make_local_stream_pair( + ioc); clients.push_back(std::move(c)); servers.push_back(std::move(s)); } diff --git a/perf/bench/corosio/local_socket_throughput_bench.cpp b/perf/bench/corosio/local_socket_throughput_bench.cpp index f57555bd8..4bacffecc 100644 --- a/perf/bench/corosio/local_socket_throughput_bench.cpp +++ b/perf/bench/corosio/local_socket_throughput_bench.cpp @@ -15,7 +15,9 @@ #include #include -#include +#include +#include +#include #include #include #include @@ -35,11 +37,15 @@ template void bench_unix_throughput(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::native_io_context ioc; - auto [writer, reader] = corosio::make_local_stream_pair(ioc); + auto [writer, reader] = + corosio::test::make_local_stream_pair(ioc); std::vector write_buf(chunk_size, 'x'); std::vector read_buf(chunk_size); @@ -93,11 +99,15 @@ template void bench_unix_bidirectional_throughput(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::native_io_context ioc; - auto [sock1, sock2] = corosio::make_local_stream_pair(ioc); + auto [sock1, sock2] = + corosio::test::make_local_stream_pair(ioc); std::vector buf1(chunk_size, 'a'); std::vector buf2(chunk_size, 'b'); @@ -178,13 +188,17 @@ template void bench_unix_throughput_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::io_context_options opts; opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - auto [writer, reader] = corosio::make_local_stream_pair(ioc); + auto [writer, reader] = + corosio::test::make_local_stream_pair(ioc); std::vector write_buf(chunk_size, 'x'); std::vector read_buf(chunk_size); @@ -238,13 +252,17 @@ template void bench_unix_bidirectional_throughput_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::io_context_options opts; opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - auto [sock1, sock2] = corosio::make_local_stream_pair(ioc); + auto [sock1, sock2] = + corosio::test::make_local_stream_pair(ioc); std::vector buf1(chunk_size, 'a'); std::vector buf2(chunk_size, 'b'); diff --git a/src/corosio/src/local_stream_acceptor.cpp b/src/corosio/src/local_stream_acceptor.cpp index 753521940..0378e58c9 100644 --- a/src/corosio/src/local_stream_acceptor.cpp +++ b/src/corosio/src/local_stream_acceptor.cpp @@ -16,6 +16,14 @@ #if BOOST_COROSIO_POSIX #include +#else +// Windows: AF_UNIX socket files are reparse points with tag +// IO_REPARSE_TAG_AF_UNIX. DeleteFileA reliably removes them; +// std::filesystem::remove via libstdc++/MS STL was observed to +// silently leave them in place on at least some Windows hosts, +// so call the Win32 API directly. +#define WIN32_LEAN_AND_MEAN +#include #endif namespace boost::corosio { @@ -51,22 +59,20 @@ local_stream_acceptor::bind(corosio::local_endpoint ep, bind_option opt) if (!is_open()) detail::throw_logic_error("bind: acceptor not open"); -#if BOOST_COROSIO_POSIX if (opt == bind_option::unlink_existing && !ep.empty() && !ep.is_abstract()) { - // Best-effort removal; ENOENT is fine. + // Best-effort removal; missing file is fine. auto p = ep.path(); - // path() is not null-terminated for the fixed buffer, - // so copy to a local array for unlink. char buf[local_endpoint::max_path_length + 1]; std::memcpy(buf, p.data(), p.size()); buf[p.size()] = '\0'; +#if BOOST_COROSIO_POSIX ::unlink(buf); - } #else - (void)opt; + ::DeleteFileA(buf); #endif + } auto& svc = static_cast(h_.service()); diff --git a/test/unit/local_datagram_socket.cpp b/test/unit/local_datagram_socket.cpp index b87341381..1f8076867 100644 --- a/test/unit/local_datagram_socket.cpp +++ b/test/unit/local_datagram_socket.cpp @@ -12,6 +12,10 @@ #include +// AF_UNIX SOCK_DGRAM is POSIX-only in practice. Windows added AF_UNIX +// in Win10 1803 but never SOCK_DGRAM over it, so WSASocket fails on +// the very first open() and every test in this file would throw. +// Keep the entire suite POSIX-gated until Windows kernel support lands. #if BOOST_COROSIO_POSIX #include @@ -23,36 +27,14 @@ #include #include -#include -#include - #include "context.hpp" +#include "local_temp.hpp" #include "test_suite.hpp" namespace boost::corosio { -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_test_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace +using test::make_temp_socket_path; +using test::cleanup_temp_socket; template struct local_datagram_socket_test @@ -145,7 +127,7 @@ struct local_datagram_socket_test auto ec = sock.bind(local_endpoint(path)); BOOST_TEST_EQ(!ec, true); - cleanup_path(path); + cleanup_temp_socket(path); } void testSendToRecvFrom() @@ -212,8 +194,8 @@ struct local_datagram_socket_test // Source endpoint should be the sender's bound path BOOST_TEST_EQ(source.path(), path1); - cleanup_path(path1); - cleanup_path(path2); + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); } void testBindFailure() @@ -520,8 +502,8 @@ struct local_datagram_socket_test BOOST_TEST_EQ(src1.path(), path1); BOOST_TEST_EQ(src2.path(), path1); - cleanup_path(path1); - cleanup_path(path2); + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); } void run() @@ -547,8 +529,4 @@ COROSIO_BACKEND_TESTS( } // namespace boost::corosio -#else // !BOOST_COROSIO_POSIX - -// Empty on non-POSIX platforms - -#endif +#endif // BOOST_COROSIO_POSIX diff --git a/test/unit/local_stream_socket.cpp b/test/unit/local_stream_socket.cpp index 8ee995255..0468e99aa 100644 --- a/test/unit/local_stream_socket.cpp +++ b/test/unit/local_stream_socket.cpp @@ -11,9 +11,6 @@ #include #include - -#if BOOST_COROSIO_POSIX - #include #include #include @@ -32,9 +29,12 @@ #include #include +#if BOOST_COROSIO_POSIX #include +#endif #include "context.hpp" +#include "local_temp.hpp" #include "test_suite.hpp" namespace boost::corosio { @@ -44,28 +44,8 @@ namespace boost::corosio { static_assert(capy::ReadStream); static_assert(capy::WriteStream); -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_test_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace +using test::make_temp_socket_path; +using test::cleanup_temp_socket; template struct local_stream_socket_test @@ -140,7 +120,7 @@ struct local_stream_socket_test ioc.run(); ioc.restart(); - cleanup_path(path); + cleanup_temp_socket(path); BOOST_TEST_EQ(accept_done, true); BOOST_TEST_EQ(!accept_ec, true); @@ -191,7 +171,7 @@ struct local_stream_socket_test ioc.run(); ioc.restart(); - cleanup_path(path); + cleanup_temp_socket(path); BOOST_TEST_EQ(accept_done, true); BOOST_TEST_EQ(!accept_ec, true); @@ -200,6 +180,8 @@ struct local_stream_socket_test BOOST_TEST_EQ(!connect_ec, true); } +#if BOOST_COROSIO_POSIX + // Uses make_local_stream_pair, which is socketpair-based and POSIX-only. void testReadWrite() { io_context ioc(Backend); @@ -255,6 +237,7 @@ struct local_stream_socket_test BOOST_TEST_EQ(s1.is_open(), true); BOOST_TEST_EQ(s2.is_open(), true); } +#endif // BOOST_COROSIO_POSIX void testUnlinkExisting() { @@ -286,7 +269,7 @@ struct local_stream_socket_test BOOST_TEST_EQ(!ec, true); } - cleanup_path(path); + cleanup_temp_socket(path); } void testUnlinkNonexistent() @@ -302,7 +285,7 @@ struct local_stream_socket_test local_endpoint(path), bind_option::unlink_existing); BOOST_TEST_EQ(!ec, true); - cleanup_path(path); + cleanup_temp_socket(path); } void testEndpointOrdering() @@ -342,16 +325,22 @@ struct local_stream_socket_test testMove(); testConnectAccept(); testMoveAccept(); +#if BOOST_COROSIO_POSIX testReadWrite(); testSocketPair(); +#endif testUnlinkExisting(); testUnlinkNonexistent(); testEndpointOrdering(); testEndpointStreamOutput(); +#if BOOST_COROSIO_POSIX testAvailable(); testRelease(); +#endif } +#if BOOST_COROSIO_POSIX + // Uses make_local_stream_pair, which is socketpair-based and POSIX-only. void testAvailable() { io_context ioc(Backend); @@ -378,7 +367,13 @@ struct local_stream_socket_test BOOST_TEST_EQ(done, true); BOOST_TEST_EQ(s2.available(), std::strlen(msg)); } +#endif // BOOST_COROSIO_POSIX +#if BOOST_COROSIO_POSIX + // Exercises raw POSIX fd ops (::write, ::close) on the released + // descriptor. The released-handle semantics are tested via the + // platform helpers; skipped on Windows because the analogous + // path needs send/closesocket and isn't yet factored. void testRelease() { io_context ioc(Backend); @@ -395,6 +390,7 @@ struct local_stream_socket_test BOOST_TEST_EQ(::write(fd, msg, std::strlen(msg)) > 0, true); ::close(fd); } +#endif void testEndpointStreamOutput() { @@ -429,9 +425,3 @@ COROSIO_BACKEND_TESTS( local_stream_socket_test, "boost.corosio.local_stream_socket") } // namespace boost::corosio - -#else // !BOOST_COROSIO_POSIX - -// Empty on non-POSIX platforms - -#endif diff --git a/test/unit/local_temp.hpp b/test/unit/local_temp.hpp new file mode 100644 index 000000000..cc4b4d1c1 --- /dev/null +++ b/test/unit/local_temp.hpp @@ -0,0 +1,68 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +/* Portable temporary path helpers for Unix-domain socket tests. + + Replaces the POSIX-only mkdtemp/unlink/rmdir pattern with + std::filesystem so tests can run on Windows (which supports + AF_UNIX since Windows 10 build 17061). + + Paths are kept short — AF_UNIX limits sun_path to ~108 bytes + on POSIX and ~108 bytes on Windows. The "co_t_" prefix and + hex random suffix keep the total well under that limit on + typical Windows installations. +*/ + +#ifndef BOOST_COROSIO_TEST_LOCAL_TEMP_HPP +#define BOOST_COROSIO_TEST_LOCAL_TEMP_HPP + +#include +#include +#include +#include +#include +#include + +namespace boost::corosio::test { + +inline std::string +make_temp_socket_path(std::string_view prefix = "co_t_") +{ + namespace fs = std::filesystem; + + static std::random_device rd; + static std::mt19937_64 gen{rd()}; + + for (int attempt = 0; attempt < 16; ++attempt) + { + std::string name(prefix); + name += std::to_string(gen()); + + auto dir = fs::temp_directory_path() / name; + + std::error_code ec; + if (fs::create_directory(dir, ec)) + return (dir / "s").string(); + } + throw std::runtime_error("failed to create temp socket directory"); +} + +inline void +cleanup_temp_socket(std::string const& path) noexcept +{ + namespace fs = std::filesystem; + std::error_code ec; + fs::path p(path); + fs::remove(p, ec); + fs::remove(p.parent_path(), ec); +} + +} // namespace boost::corosio::test + +#endif diff --git a/test/unit/native/native_local_datagram_socket.cpp b/test/unit/native/native_local_datagram_socket.cpp new file mode 100644 index 000000000..7dab48906 --- /dev/null +++ b/test/unit/native/native_local_datagram_socket.cpp @@ -0,0 +1,313 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include + +// AF_UNIX SOCK_DGRAM is POSIX-only in practice. Windows added AF_UNIX +// in Win10 1803 but never SOCK_DGRAM over it, so WSASocket fails on +// the very first open() and every test in this file would throw. +#if BOOST_COROSIO_POSIX + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "context.hpp" +#include "local_temp.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +using test::make_temp_socket_path; +using test::cleanup_temp_socket; + +template +struct native_local_datagram_socket_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .send_to( + std::declval(), + std::declval())), + decltype(std::declval().send_to( + std::declval(), + std::declval()))>, + "native_local_datagram_socket::send_to must shadow local_datagram_socket::send_to"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .recv_from( + std::declval(), + std::declval())), + decltype(std::declval().recv_from( + std::declval(), + std::declval()))>, + "native_local_datagram_socket::recv_from must shadow local_datagram_socket::recv_from"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .connect(std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_local_datagram_socket::connect must shadow local_datagram_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .send(std::declval())), + decltype(std::declval().send( + std::declval()))>, + "native_local_datagram_socket::send must shadow local_datagram_socket::send"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .recv(std::declval())), + decltype(std::declval().recv( + std::declval()))>, + "native_local_datagram_socket::recv must shadow local_datagram_socket::recv"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .wait(wait_type::read)), + decltype(std::declval().wait( + wait_type::read))>, + "native_local_datagram_socket::wait must shadow local_datagram_socket::wait"); + + void testConstruct() + { + io_context ioc(Backend); + native_local_datagram_socket s(ioc); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testOpen() + { + io_context ioc(Backend); + native_local_datagram_socket s(ioc); + s.open(); + BOOST_TEST(s.is_open()); + s.close(); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + native_local_datagram_socket s(ioc); + s.open(); + local_datagram_socket& base = s; + BOOST_TEST(base.is_open()); + } + + void testSendToRecvFrom() + { + io_context ioc(Backend); + auto path1 = make_temp_socket_path(); + auto path2 = make_temp_socket_path(); + + native_local_datagram_socket sender(ioc); + native_local_datagram_socket receiver(ioc); + sender.open(); + receiver.open(); + + auto ec1 = sender.bind(local_endpoint(path1)); + auto ec2 = receiver.bind(local_endpoint(path2)); + BOOST_TEST_EQ(ec1, std::error_code{}); + BOOST_TEST_EQ(ec2, std::error_code{}); + + auto task = + [](native_local_datagram_socket& s, + native_local_datagram_socket& r, + local_endpoint dest) -> capy::task<> { + char const msg[] = "native dgram"; + auto [sec, sn] = + co_await s.send_to(capy::const_buffer(msg, sizeof(msg)), dest); + BOOST_TEST_EQ(sec, std::error_code{}); + BOOST_TEST_EQ(sn, sizeof(msg)); + + char buf[64] = {}; + local_endpoint source; + auto [rec, rn] = co_await r.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), source); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(rn, sizeof(msg)); + BOOST_TEST_EQ(std::strcmp(buf, "native dgram"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(sender, receiver, local_endpoint(path2))); + ioc.run(); + + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); + } + + void testSendRecvConnected() + { + io_context ioc(Backend); + auto path_a = make_temp_socket_path(); + auto path_b = make_temp_socket_path(); + + native_local_datagram_socket a(ioc); + native_local_datagram_socket b(ioc); + a.open(); + b.open(); + + auto eca = a.bind(local_endpoint(path_a)); + auto ecb = b.bind(local_endpoint(path_b)); + BOOST_TEST_EQ(eca, std::error_code{}); + BOOST_TEST_EQ(ecb, std::error_code{}); + + auto task = + [](native_local_datagram_socket& a, + native_local_datagram_socket& b, + local_endpoint a_to_b, + local_endpoint b_to_a) -> capy::task<> { + auto [ec1] = co_await a.connect(a_to_b); + BOOST_TEST_EQ(ec1, std::error_code{}); + auto [ec2] = co_await b.connect(b_to_a); + BOOST_TEST_EQ(ec2, std::error_code{}); + + char const msg[] = "connected dgram"; + auto [sec, sn] = + co_await a.send(capy::const_buffer(msg, sizeof(msg))); + BOOST_TEST_EQ(sec, std::error_code{}); + BOOST_TEST_EQ(sn, sizeof(msg)); + + char buf[64] = {}; + auto [rec, rn] = + co_await b.recv(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(rn, sizeof(msg)); + BOOST_TEST_EQ(std::strcmp(buf, "connected dgram"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task( + a, b, local_endpoint(path_b), local_endpoint(path_a))); + ioc.run(); + + cleanup_temp_socket(path_a); + cleanup_temp_socket(path_b); + } + + void testVirtualDispatchFallback() + { + io_context ioc(Backend); + auto path1 = make_temp_socket_path(); + auto path2 = make_temp_socket_path(); + + native_local_datagram_socket sender(ioc); + native_local_datagram_socket receiver(ioc); + sender.open(); + receiver.open(); + + auto ec1 = sender.bind(local_endpoint(path1)); + auto ec2 = receiver.bind(local_endpoint(path2)); + BOOST_TEST_EQ(ec1, std::error_code{}); + BOOST_TEST_EQ(ec2, std::error_code{}); + + local_datagram_socket& s_ref = sender; + local_datagram_socket& r_ref = receiver; + + auto task = [](local_datagram_socket& s, local_datagram_socket& r, + local_endpoint dest) -> capy::task<> { + char const msg[] = "virtual"; + auto [sec, sn] = + co_await s.send_to(capy::const_buffer(msg, sizeof(msg)), dest); + BOOST_TEST_EQ(sec, std::error_code{}); + + char buf[64] = {}; + local_endpoint source; + auto [rec, rn] = co_await r.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), source); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::strcmp(buf, "virtual"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(s_ref, r_ref, local_endpoint(path2))); + ioc.run(); + + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); + } + + // Exercise the shadowed wait() awaitable: wait_type::read on a + // bound datagram socket resolves when a datagram arrives. + void testWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto rx_path = test::make_temp_socket_path(); + + native_local_datagram_socket recv(ioc); + recv.open(); + auto bec = recv.bind(local_endpoint(rx_path)); + BOOST_TEST(!bec); + + native_local_datagram_socket send(ioc); + send.open(); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await recv.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto sender = [&]() -> capy::task<> { + char dg[1] = {'X'}; + auto [ec, n] = co_await send.send_to( + capy::const_buffer(dg, sizeof(dg)), + local_endpoint(rx_path)); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(sender()); + ioc.run(); + + test::cleanup_temp_socket(rx_path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + void run() + { + testConstruct(); + testOpen(); + testPolymorphicSlice(); + testSendToRecvFrom(); + testSendRecvConnected(); + testVirtualDispatchFallback(); + testWait(); + } +}; + +COROSIO_BACKEND_TESTS( + native_local_datagram_socket_test, + "boost.corosio.native.local_datagram_socket") + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_POSIX diff --git a/test/unit/native/native_local_stream_socket.cpp b/test/unit/native/native_local_stream_socket.cpp new file mode 100644 index 000000000..0db9a2166 --- /dev/null +++ b/test/unit/native/native_local_stream_socket.cpp @@ -0,0 +1,359 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include "context.hpp" +#include "local_temp.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +using test::make_temp_socket_path; +using test::cleanup_temp_socket; + +template +struct native_local_stream_socket_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .read_some(std::declval())), + decltype(std::declval().read_some( + std::declval()))>, + "native_local_stream_socket::read_some must shadow io_stream::read_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .write_some(std::declval())), + decltype(std::declval().write_some( + std::declval()))>, + "native_local_stream_socket::write_some must shadow io_stream::write_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .connect(std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_local_stream_socket::connect must shadow local_stream_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .accept(std::declval())), + decltype(std::declval().accept( + std::declval()))>, + "native_local_stream_acceptor::accept(peer) must shadow local_stream_acceptor::accept(peer)"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .accept()), + decltype(std::declval().accept())>, + "native_local_stream_acceptor::accept() must shadow local_stream_acceptor::accept()"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait( + wait_type::read))>, + "native_local_stream_socket::wait must shadow local_stream_socket::wait"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .wait(wait_type::read)), + decltype(std::declval().wait( + wait_type::read))>, + "native_local_stream_acceptor::wait must shadow local_stream_acceptor::wait"); + + void testConstruct() + { + io_context ioc(Backend); + native_local_stream_socket s(ioc); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testOpen() + { + io_context ioc(Backend); + native_local_stream_socket s(ioc); + s.open(); + BOOST_TEST(s.is_open()); + s.close(); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + native_local_stream_socket s(ioc); + s.open(); + local_stream_socket& base = s; + BOOST_TEST(base.is_open()); + } + + void testConnectAcceptReadWrite() + { + io_context ioc(Backend); + auto path = make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto ec = acc.bind(local_endpoint(path)); + BOOST_TEST_EQ(ec, std::error_code{}); + ec = acc.listen(); + BOOST_TEST_EQ(ec, std::error_code{}); + + native_local_stream_socket server(ioc); + native_local_stream_socket client(ioc); + + auto acceptor_task = + [](native_local_stream_acceptor& a, + native_local_stream_socket& s) -> capy::task<> { + auto [ec] = co_await a.accept(s); + BOOST_TEST_EQ(ec, std::error_code{}); + + char buf[64] = {}; + auto [rec, n] = + co_await s.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::string(buf, n), std::string("native unix")); + }; + + auto client_task = [](native_local_stream_socket& c, + local_endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + BOOST_TEST_EQ(ec, std::error_code{}); + + char const msg[] = "native unix"; + auto [wec, n] = + co_await c.write_some(capy::const_buffer(msg, sizeof(msg) - 1)); + BOOST_TEST_EQ(wec, std::error_code{}); + BOOST_TEST_EQ(n, sizeof(msg) - 1); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(acceptor_task(acc, server)); + capy::run_async(ex)(client_task(client, local_endpoint(path))); + ioc.run(); + + cleanup_temp_socket(path); + } + + void testMoveAccept() + { + io_context ioc(Backend); + auto path = make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto ec = acc.bind(local_endpoint(path)); + BOOST_TEST_EQ(ec, std::error_code{}); + ec = acc.listen(); + BOOST_TEST_EQ(ec, std::error_code{}); + + native_local_stream_socket client(ioc); + + auto acceptor_task = + [](native_local_stream_acceptor& a) -> capy::task<> { + auto [ec, peer] = co_await a.accept(); + BOOST_TEST_EQ(ec, std::error_code{}); + BOOST_TEST(peer.is_open()); + + char buf[64] = {}; + auto [rec, n] = + co_await peer.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::string(buf, n), std::string("move accept")); + }; + + auto client_task = [](native_local_stream_socket& c, + local_endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + BOOST_TEST_EQ(ec, std::error_code{}); + + char const msg[] = "move accept"; + auto [wec, n] = + co_await c.write_some(capy::const_buffer(msg, sizeof(msg) - 1)); + BOOST_TEST_EQ(wec, std::error_code{}); + BOOST_TEST_EQ(n, sizeof(msg) - 1); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(acceptor_task(acc)); + capy::run_async(ex)(client_task(client, local_endpoint(path))); + ioc.run(); + + cleanup_temp_socket(path); + } + + void testVirtualDispatchFallback() + { + io_context ioc(Backend); + auto path = make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto ec = acc.bind(local_endpoint(path)); + BOOST_TEST_EQ(ec, std::error_code{}); + ec = acc.listen(); + BOOST_TEST_EQ(ec, std::error_code{}); + + native_local_stream_socket server(ioc); + native_local_stream_socket client(ioc); + + local_stream_acceptor& acc_ref = acc; + local_stream_socket& server_ref = server; + local_stream_socket& client_ref = client; + + auto acceptor_task = [](local_stream_acceptor& a, + local_stream_socket& s) -> capy::task<> { + auto [ec] = co_await a.accept(s); + BOOST_TEST_EQ(ec, std::error_code{}); + + char buf[64] = {}; + auto [rec, n] = + co_await s.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::string(buf, n), std::string("virtual")); + }; + + auto client_task = [](local_stream_socket& c, + local_endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + BOOST_TEST_EQ(ec, std::error_code{}); + + char const msg[] = "virtual"; + (void)co_await c.write_some( + capy::const_buffer(msg, sizeof(msg) - 1)); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(acceptor_task(acc_ref, server_ref)); + capy::run_async(ex)(client_task(client_ref, local_endpoint(path))); + ioc.run(); + + cleanup_temp_socket(path); + } + + // Exercise the shadowed wait() awaitable on the socket: a + // connected stream is always writable, so wait_type::write + // resolves immediately on every backend. + void testSocketWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto path = test::make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto bec = acc.bind(local_endpoint(path)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + + native_local_stream_socket server(ioc); + native_local_stream_socket client(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto rendezvous = [&]() -> capy::task<> { + auto [ec] = co_await acc.accept(server); + (void)ec; + }; + auto connect_task = [&]() -> capy::task<> { + auto [ec] = co_await client.connect(local_endpoint(path)); + (void)ec; + }; + capy::run_async(ex)(rendezvous()); + capy::run_async(ex)(connect_task()); + ioc.run(); + ioc.restart(); + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await client.wait(wait_type::write); + wait_ec = ec; + wait_done = true; + }; + capy::run_async(ex)(waiter()); + ioc.run(); + + test::cleanup_temp_socket(path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + // Exercise the shadowed wait() awaitable on the acceptor: + // wait_type::read resolves once a client connects. + void testAcceptorWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto path = test::make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto bec = acc.bind(local_endpoint(path)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + + native_local_stream_socket client(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await acc.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto connect_task = [&]() -> capy::task<> { + auto [ec] = co_await client.connect(local_endpoint(path)); + (void)ec; + }; + capy::run_async(ex)(waiter()); + capy::run_async(ex)(connect_task()); + ioc.run(); + + test::cleanup_temp_socket(path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + void run() + { + testConstruct(); + testOpen(); + testPolymorphicSlice(); + testConnectAcceptReadWrite(); + testMoveAccept(); + testVirtualDispatchFallback(); + testSocketWait(); + testAcceptorWait(); + } +}; + +COROSIO_BACKEND_TESTS( + native_local_stream_socket_test, "boost.corosio.native.local_stream_socket") + +} // namespace boost::corosio diff --git a/test/unit/native/native_random_access_file.cpp b/test/unit/native/native_random_access_file.cpp new file mode 100644 index 000000000..4b45ee32b --- /dev/null +++ b/test/unit/native/native_random_access_file.cpp @@ -0,0 +1,205 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "context.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +namespace { + +struct temp_file +{ + std::filesystem::path path; + + explicit temp_file(std::string_view prefix = "corosio_native_raf_") + { + // Per-process random_device-seeded RNG so concurrent test + // processes (e.g. ctest --parallel running .epoll and .select + // variants of the same suite) don't collide on identical paths. + static thread_local std::mt19937_64 gen{std::random_device{}()}; + path = std::filesystem::temp_directory_path() + / (std::string(prefix) + std::to_string(gen())); + } + + temp_file(std::string_view prefix, std::string_view contents) + : temp_file(prefix) + { + std::ofstream ofs(path, std::ios::binary); + ofs.write( + contents.data(), + static_cast(contents.size())); + } + + ~temp_file() + { + std::error_code ec; + std::filesystem::remove(path, ec); + } + + temp_file(temp_file const&) = delete; + temp_file& operator=(temp_file const&) = delete; +}; + +} // namespace + +template +struct native_random_access_file_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .read_some_at( + std::uint64_t{0}, + std::declval())), + decltype(std::declval().read_some_at( + std::uint64_t{0}, + std::declval()))>, + "native_random_access_file::read_some_at must shadow random_access_file::read_some_at"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .write_some_at( + std::uint64_t{0}, + std::declval())), + decltype(std::declval().write_some_at( + std::uint64_t{0}, + std::declval()))>, + "native_random_access_file::write_some_at must shadow random_access_file::write_some_at"); + + void testConstruct() + { + io_context ioc(Backend); + native_random_access_file f(ioc); + BOOST_TEST_EQ(f.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + temp_file tmp("native_raf_slice_", "x"); + native_random_access_file f(ioc); + f.open(tmp.path, file_base::read_only); + + random_access_file& base = f; + BOOST_TEST(base.is_open()); + } + + void testReadSomeAt() + { + std::string data = "ABCDEFGHIJ"; + temp_file tmp("native_raf_read_", data); + + io_context ioc(Backend); + native_random_access_file f(ioc); + f.open(tmp.path, file_base::read_only); + + char buf[5] = {}; + std::size_t n_out = 0; + + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await f.read_some_at( + 3, capy::mutable_buffer(buf, 5)); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, 5u); + BOOST_TEST_EQ(std::memcmp(buf, "DEFGH", 5), 0); + } + + void testWriteSomeAt() + { + temp_file tmp("native_raf_write_"); + + io_context ioc(Backend); + native_random_access_file f(ioc); + f.open( + tmp.path, + file_base::read_write | file_base::create | file_base::truncate); + + std::size_t written = 0; + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await f.write_some_at( + 0, capy::const_buffer("hello", 5)); + BOOST_TEST_EQ(ec, std::error_code{}); + written = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(written, 5u); + + f.close(); + std::ifstream ifs(tmp.path, std::ios::binary); + std::string contents( + (std::istreambuf_iterator(ifs)), + std::istreambuf_iterator()); + BOOST_TEST_EQ(contents, std::string("hello")); + } + + void testVirtualDispatchFallback() + { + std::string data = "fallback"; + temp_file tmp("native_raf_fb_", data); + + io_context ioc(Backend); + native_random_access_file f(ioc); + f.open(tmp.path, file_base::read_only); + + random_access_file& base = f; + + char buf[64] = {}; + std::size_t n_out = 0; + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await base.read_some_at( + 0, capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, data.size()); + BOOST_TEST_EQ(std::memcmp(buf, data.data(), data.size()), 0); + } + + void run() + { + testConstruct(); + testPolymorphicSlice(); + testReadSomeAt(); + testWriteSomeAt(); + testVirtualDispatchFallback(); + } +}; + +COROSIO_BACKEND_TESTS( + native_random_access_file_test, + "boost.corosio.native.random_access_file") + +} // namespace boost::corosio diff --git a/test/unit/native/native_resolver.cpp b/test/unit/native/native_resolver.cpp index 51a7789c9..4969357c8 100644 --- a/test/unit/native/native_resolver.cpp +++ b/test/unit/native/native_resolver.cpp @@ -13,6 +13,10 @@ #include #include +#include +#include +#include + #include "context.hpp" #include "test_suite.hpp" @@ -21,6 +25,25 @@ namespace boost::corosio { template struct native_resolver_test { + // resolve(host, service) - forward resolution + static_assert( + !std::is_same_v< + decltype(std::declval&>().resolve( + std::declval(), + std::declval())), + decltype(std::declval().resolve( + std::declval(), + std::declval()))>, + "native_resolver::resolve(host, service) must shadow resolver::resolve"); + // resolve(endpoint) - reverse resolution + static_assert( + !std::is_same_v< + decltype(std::declval&>().resolve( + std::declval())), + decltype(std::declval().resolve( + std::declval()))>, + "native_resolver::resolve(endpoint) must shadow resolver::resolve"); + void testResolverConstruct() { io_context ctx(Backend); diff --git a/test/unit/native/native_signal_set.cpp b/test/unit/native/native_signal_set.cpp index 269c1e30d..2a32b7335 100644 --- a/test/unit/native/native_signal_set.cpp +++ b/test/unit/native/native_signal_set.cpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -20,6 +22,12 @@ namespace boost::corosio { template struct native_signal_set_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait()), + decltype(std::declval().wait())>, + "native_signal_set::wait must shadow io_signal_set::wait"); + void testSignalSetConstruct() { io_context ctx(Backend); diff --git a/test/unit/native/native_stream_file.cpp b/test/unit/native/native_stream_file.cpp new file mode 100644 index 000000000..fc9771137 --- /dev/null +++ b/test/unit/native/native_stream_file.cpp @@ -0,0 +1,199 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "context.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +namespace { + +struct temp_file +{ + std::filesystem::path path; + + explicit temp_file(std::string_view prefix = "corosio_native_sf_") + { + // Per-process random_device-seeded RNG so concurrent test + // processes (e.g. ctest --parallel running .epoll and .select + // variants of the same suite) don't collide on identical paths. + static thread_local std::mt19937_64 gen{std::random_device{}()}; + path = std::filesystem::temp_directory_path() + / (std::string(prefix) + std::to_string(gen())); + } + + temp_file(std::string_view prefix, std::string_view contents) + : temp_file(prefix) + { + std::ofstream ofs(path, std::ios::binary); + ofs.write( + contents.data(), + static_cast(contents.size())); + } + + ~temp_file() + { + std::error_code ec; + std::filesystem::remove(path, ec); + } + + temp_file(temp_file const&) = delete; + temp_file& operator=(temp_file const&) = delete; +}; + +} // namespace + +template +struct native_stream_file_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>().read_some( + std::declval())), + decltype(std::declval().read_some( + std::declval()))>, + "native_stream_file::read_some must shadow io_stream::read_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().write_some( + std::declval())), + decltype(std::declval().write_some( + std::declval()))>, + "native_stream_file::write_some must shadow io_stream::write_some"); + + void testConstruct() + { + io_context ioc(Backend); + native_stream_file f(ioc); + BOOST_TEST_EQ(f.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + temp_file tmp("native_sf_slice_", "x"); + native_stream_file f(ioc); + f.open(tmp.path, file_base::read_only); + + stream_file& base = f; + BOOST_TEST(base.is_open()); + } + + void testReadSome() + { + std::string data = "hello native"; + temp_file tmp("native_sf_read_", data); + + io_context ioc(Backend); + native_stream_file f(ioc); + f.open(tmp.path, file_base::read_only); + + char buf[64] = {}; + std::size_t n_out = 0; + + auto task = [&]() -> capy::task<> { + auto [ec, n] = + co_await f.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, data.size()); + BOOST_TEST_EQ(std::memcmp(buf, data.data(), data.size()), 0); + } + + void testWriteSome() + { + temp_file tmp("native_sf_write_"); + + io_context ioc(Backend); + native_stream_file f(ioc); + f.open( + tmp.path, + file_base::write_only | file_base::create | file_base::truncate); + + char const msg[] = "native write"; + std::size_t written = 0; + + auto task = [&]() -> capy::task<> { + auto [ec, n] = + co_await f.write_some(capy::const_buffer(msg, sizeof(msg) - 1)); + BOOST_TEST_EQ(ec, std::error_code{}); + written = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(written, sizeof(msg) - 1); + + f.close(); + std::ifstream ifs(tmp.path, std::ios::binary); + std::string contents( + (std::istreambuf_iterator(ifs)), + std::istreambuf_iterator()); + BOOST_TEST_EQ(contents, std::string(msg)); + } + + void testVirtualDispatchFallback() + { + std::string data = "fallback"; + temp_file tmp("native_sf_fb_", data); + + io_context ioc(Backend); + native_stream_file f(ioc); + f.open(tmp.path, file_base::read_only); + + stream_file& base = f; + + char buf[64] = {}; + std::size_t n_out = 0; + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await base.read_some( + capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, data.size()); + } + + void run() + { + testConstruct(); + testPolymorphicSlice(); + testReadSome(); + testWriteSome(); + testVirtualDispatchFallback(); + } +}; + +COROSIO_BACKEND_TESTS( + native_stream_file_test, "boost.corosio.native.stream_file") + +} // namespace boost::corosio diff --git a/test/unit/native/native_tcp_acceptor.cpp b/test/unit/native/native_tcp_acceptor.cpp index 898d0f676..5c361a880 100644 --- a/test/unit/native/native_tcp_acceptor.cpp +++ b/test/unit/native/native_tcp_acceptor.cpp @@ -8,9 +8,17 @@ // #include +#include #include #include +#include +#include + +#include +#include +#include + #include "context.hpp" #include "test_suite.hpp" @@ -19,6 +27,20 @@ namespace boost::corosio { template struct native_tcp_acceptor_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().accept( + std::declval())), + decltype(std::declval().accept( + std::declval()))>, + "native_tcp_acceptor::accept must shadow tcp_acceptor::accept"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait(wait_type::read))>, + "native_tcp_acceptor::wait must shadow tcp_acceptor::wait"); + void testAcceptorConstruct() { io_context ctx(Backend); @@ -57,11 +79,53 @@ struct native_tcp_acceptor_test BOOST_TEST(base.is_open()); } + // Exercise the shadowed wait() awaitable: wait_type::read on a + // listening acceptor resolves when a connection arrives. + void testWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + native_tcp_acceptor acc(ioc); + acc.open(); + acc.set_option(native_socket_option::reuse_address(true)); + auto bec = acc.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + auto port = acc.local_endpoint().port(); + + native_tcp_socket client(ioc); + client.open(); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await acc.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto connector = [&]() -> capy::task<> { + auto [ec] = co_await client.connect( + endpoint(ipv4_address::loopback(), port)); + (void)ec; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(connector()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + void run() { testAcceptorConstruct(); testAcceptorMoveConstruct(); testAcceptorPolymorphicSlice(); + testWait(); } }; diff --git a/test/unit/native/native_tcp_socket.cpp b/test/unit/native/native_tcp_socket.cpp index 2aeda892b..fa188433c 100644 --- a/test/unit/native/native_tcp_socket.cpp +++ b/test/unit/native/native_tcp_socket.cpp @@ -8,7 +8,17 @@ // #include +#include #include +#include + +#include +#include +#include + +#include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -18,6 +28,38 @@ namespace boost::corosio { template struct native_tcp_socket_test { + // Shadow-engagement checks: the native overload must return a + // distinct awaitable type from the base. If a shadow is broken + // (e.g. signature drift, missing override), these fail at compile + // time. The check is unevaluated; no runtime cost. + static_assert( + !std::is_same_v< + decltype(std::declval&>().read_some( + std::declval())), + decltype(std::declval().read_some( + std::declval()))>, + "native_tcp_socket::read_some must shadow io_stream::read_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().write_some( + std::declval())), + decltype(std::declval().write_some( + std::declval()))>, + "native_tcp_socket::write_some must shadow io_stream::write_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().connect( + std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_tcp_socket::connect must shadow tcp_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait(wait_type::read))>, + "native_tcp_socket::wait must shadow tcp_socket::wait"); + void testSocketConstruct() { io_context ctx(Backend); @@ -57,11 +99,37 @@ struct native_tcp_socket_test BOOST_TEST_PASS(); } + // Exercise the shadowed wait() awaitable. On a connected socket + // wait_type::write resolves immediately on every backend (IOCP + // matches asio's "writable is always ready" semantics). + void testWait() + { + io_context ioc(Backend); + auto [s1, s2] = test::make_socket_pair< + native_tcp_socket, + native_tcp_acceptor>(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await s1.wait(wait_type::write); + wait_ec = ec; + wait_done = true; + }; + capy::run_async(ioc.get_executor())(waiter()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + void run() { testSocketConstruct(); testSocketMoveConstruct(); testSocketPolymorphicSlice(); + testWait(); } }; diff --git a/test/unit/native/native_timer.cpp b/test/unit/native/native_timer.cpp index 13bd214cd..e0e29387d 100644 --- a/test/unit/native/native_timer.cpp +++ b/test/unit/native/native_timer.cpp @@ -14,6 +14,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -23,6 +25,12 @@ namespace boost::corosio { template struct native_timer_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait()), + decltype(std::declval().wait())>, + "native_timer::wait must shadow io_timer::wait"); + void testTimerConstruct() { io_context ctx(Backend); diff --git a/test/unit/native/native_udp_socket.cpp b/test/unit/native/native_udp_socket.cpp index 790236383..410d6df6d 100644 --- a/test/unit/native/native_udp_socket.cpp +++ b/test/unit/native/native_udp_socket.cpp @@ -18,6 +18,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -27,6 +29,52 @@ namespace boost::corosio { template struct native_udp_socket_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().send_to( + std::declval(), + std::declval())), + decltype(std::declval().send_to( + std::declval(), + std::declval()))>, + "native_udp_socket::send_to must shadow udp_socket::send_to"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().recv_from( + std::declval(), + std::declval())), + decltype(std::declval().recv_from( + std::declval(), + std::declval()))>, + "native_udp_socket::recv_from must shadow udp_socket::recv_from"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().connect( + std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_udp_socket::connect must shadow udp_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().send( + std::declval())), + decltype(std::declval().send( + std::declval()))>, + "native_udp_socket::send must shadow udp_socket::send"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().recv( + std::declval())), + decltype(std::declval().recv( + std::declval()))>, + "native_udp_socket::recv must shadow udp_socket::recv"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait(wait_type::read))>, + "native_udp_socket::wait must shadow udp_socket::wait"); + void testConstruct() { io_context ctx(Backend); @@ -179,6 +227,85 @@ struct native_udp_socket_test ioc.run(); } + void testSendRecvConnected() + { + io_context ioc(Backend); + + native_udp_socket a(ioc); + native_udp_socket b(ioc); + + b.open(); + auto ec = b.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST_EQ(ec, std::error_code{}); + auto b_ep = b.local_endpoint(); + + auto task = [](native_udp_socket& a, + native_udp_socket& b, + endpoint dest) -> capy::task<> { + auto [ec1] = co_await a.connect(dest); + BOOST_TEST_EQ(ec1, std::error_code{}); + BOOST_TEST(a.is_open()); + + char const msg[] = "native connected"; + auto [ec2, n2] = + co_await a.send(capy::const_buffer(msg, sizeof(msg))); + BOOST_TEST_EQ(ec2, std::error_code{}); + BOOST_TEST_EQ(n2, sizeof(msg)); + + char buf[64] = {}; + endpoint source; + auto [ec3, n3] = co_await b.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), source); + BOOST_TEST_EQ(ec3, std::error_code{}); + BOOST_TEST_EQ(n3, sizeof(msg)); + BOOST_TEST_EQ(std::strcmp(buf, "native connected"), 0); + + auto [ec4] = co_await b.connect(source); + BOOST_TEST_EQ(ec4, std::error_code{}); + + char const reply[] = "native reply"; + auto [ec5, n5] = + co_await b.send(capy::const_buffer(reply, sizeof(reply))); + BOOST_TEST_EQ(ec5, std::error_code{}); + + char buf2[64] = {}; + auto [ec6, n6] = + co_await a.recv(capy::mutable_buffer(buf2, sizeof(buf2))); + BOOST_TEST_EQ(ec6, std::error_code{}); + BOOST_TEST_EQ(n6, sizeof(reply)); + BOOST_TEST_EQ(std::strcmp(buf2, "native reply"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(a, b, b_ep)); + ioc.run(); + } + + void testConnectAutoOpen() + { + io_context ioc(Backend); + + native_udp_socket receiver(ioc); + receiver.open(); + auto ec = receiver.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST_EQ(ec, std::error_code{}); + auto recv_ep = receiver.local_endpoint(); + + native_udp_socket sender(ioc); + BOOST_TEST_EQ(sender.is_open(), false); + + auto task = [](native_udp_socket& s, + endpoint dest) -> capy::task<> { + auto [ec] = co_await s.connect(dest); + BOOST_TEST_EQ(ec, std::error_code{}); + BOOST_TEST(s.is_open()); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(sender, recv_ep)); + ioc.run(); + } + void testVirtualDispatchFallback() { // Verify that calling through udp_socket& uses virtual dispatch @@ -217,15 +344,59 @@ struct native_udp_socket_test ioc.run(); } + // Exercise the shadowed wait() awaitable: wait_type::read on a + // bound UDP socket resolves when a datagram arrives from a peer. + void testWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + native_udp_socket recv(ioc); + recv.open(udp::v4()); + auto bec = recv.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + auto port = recv.local_endpoint().port(); + + native_udp_socket send(ioc); + send.open(udp::v4()); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await recv.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto sender = [&]() -> capy::task<> { + char dg[1] = {'X'}; + auto [ec, n] = co_await send.send_to( + capy::const_buffer(dg, sizeof(dg)), + endpoint(ipv4_address::loopback(), port)); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(sender()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + void run() { testConstruct(); testMoveConstruct(); testPolymorphicSlice(); testSendRecvLoopback(); + testSendRecvConnected(); + testConnectAutoOpen(); testCancelRecv(); testCloseWhileRecving(); testVirtualDispatchFallback(); + testWait(); } }; From 4fd8b0bfd8d67fd76f977f958a8c725b9956d88f Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Tue, 26 May 2026 20:18:07 +0200 Subject: [PATCH 2/2] test(timer): remove timing dependency from expires_at/after waiter-count tests Replace the 10ms delay timer that sequenced waiter registration before the reset with explicit poll() drains. The previous structure relied on the IOCP scheduler dispatching the two t.wait() suspensions before the delay timer expired; on Windows release builds the order isn't guaranteed and the test would intermittently observe zero canceled waiters. --- test/unit/timer.cpp | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/test/unit/timer.cpp b/test/unit/timer.cpp index fd805b00f..3ae1797fb 100644 --- a/test/unit/timer.cpp +++ b/test/unit/timer.cpp @@ -743,13 +743,11 @@ struct timer_test { io_context ioc(Backend); timer t(ioc); - timer delay(ioc); bool w1 = false, w2 = false; std::error_code ec1, ec2; t.expires_after(std::chrono::seconds(60)); - delay.expires_after(std::chrono::milliseconds(10)); auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> { @@ -758,18 +756,19 @@ struct timer_test done = true; }; - std::size_t expires_count = 0; - auto reset_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> { - (void)co_await delay_ref.wait(); - expires_count = t_ref.expires_at( - timer::clock_type::now() + std::chrono::seconds(30)); - }; - capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); - capy::run_async(ioc.get_executor())(reset_task(delay, t)); - ioc.run_for(std::chrono::milliseconds(100)); + // Drain so both waiters suspend on t.wait() and register. + while (ioc.poll() > 0) + ; + + auto expires_count = t.expires_at( + timer::clock_type::now() + std::chrono::seconds(30)); + + // Drain so the canceled completions reach the coroutines. + while (ioc.poll() > 0) + ; BOOST_TEST_EQ(expires_count, 2u); BOOST_TEST(w1); @@ -782,13 +781,11 @@ struct timer_test { io_context ioc(Backend); timer t(ioc); - timer delay(ioc); bool w1 = false, w2 = false; std::error_code ec1, ec2; t.expires_after(std::chrono::seconds(60)); - delay.expires_after(std::chrono::milliseconds(10)); auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> { @@ -797,17 +794,18 @@ struct timer_test done = true; }; - std::size_t expires_count = 0; - auto reset_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> { - (void)co_await delay_ref.wait(); - expires_count = t_ref.expires_after(std::chrono::seconds(30)); - }; - capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); - capy::run_async(ioc.get_executor())(reset_task(delay, t)); - ioc.run_for(std::chrono::milliseconds(100)); + // Drain so both waiters suspend on t.wait() and register. + while (ioc.poll() > 0) + ; + + auto expires_count = t.expires_after(std::chrono::seconds(30)); + + // Drain so the canceled completions reach the coroutines. + while (ioc.poll() > 0) + ; BOOST_TEST_EQ(expires_count, 2u); BOOST_TEST(w1);