diff --git a/include/boost/corosio/backend.hpp b/include/boost/corosio/backend.hpp index e1d52ee09..ea14c1a91 100644 --- a/include/boost/corosio/backend.hpp +++ b/include/boost/corosio/backend.hpp @@ -45,6 +45,10 @@ class posix_signal; class posix_signal_service; class posix_resolver; class posix_resolver_service; +class posix_stream_file; +class posix_stream_file_service; +class posix_random_access_file; +class posix_random_access_file_service; } // namespace detail @@ -71,6 +75,11 @@ struct epoll_t using resolver_type = detail::posix_resolver; using resolver_service_type = detail::posix_resolver_service; + using stream_file_type = detail::posix_stream_file; + using stream_file_service_type = detail::posix_stream_file_service; + using random_access_file_type = detail::posix_random_access_file; + using random_access_file_service_type = detail::posix_random_access_file_service; + /// Create the scheduler and services for this backend. BOOST_COROSIO_DECL static detail::scheduler& construct(capy::execution_context&, unsigned concurrency_hint); @@ -103,6 +112,10 @@ class posix_signal; class posix_signal_service; class posix_resolver; class posix_resolver_service; +class posix_stream_file; +class posix_stream_file_service; +class posix_random_access_file; +class posix_random_access_file_service; } // namespace detail @@ -129,6 +142,11 @@ struct select_t using resolver_type = detail::posix_resolver; using resolver_service_type = detail::posix_resolver_service; + using stream_file_type = detail::posix_stream_file; + using stream_file_service_type = detail::posix_stream_file_service; + using random_access_file_type = detail::posix_random_access_file; + using random_access_file_service_type = detail::posix_random_access_file_service; + /// Create the scheduler and services for this backend. BOOST_COROSIO_DECL static detail::scheduler& construct(capy::execution_context&, unsigned concurrency_hint); @@ -161,6 +179,10 @@ class posix_signal; class posix_signal_service; class posix_resolver; class posix_resolver_service; +class posix_stream_file; +class posix_stream_file_service; +class posix_random_access_file; +class posix_random_access_file_service; } // namespace detail @@ -187,6 +209,11 @@ struct kqueue_t using resolver_type = detail::posix_resolver; using resolver_service_type = detail::posix_resolver_service; + using stream_file_type = detail::posix_stream_file; + using stream_file_service_type = detail::posix_stream_file_service; + using random_access_file_type = detail::posix_random_access_file; + using random_access_file_service_type = detail::posix_random_access_file_service; + /// Create the scheduler and services for this backend. BOOST_COROSIO_DECL static detail::scheduler& construct(capy::execution_context&, unsigned concurrency_hint); @@ -260,6 +287,11 @@ struct iocp_t using resolver_type = detail::win_resolver; using resolver_service_type = detail::win_resolver_service; + using stream_file_type = detail::win_stream_file; + using stream_file_service_type = detail::win_file_service; + using random_access_file_type = detail::win_random_access_file; + using random_access_file_service_type = detail::win_random_access_file_service; + /** Create the scheduler and services for this backend. @param ctx The execution context that owns the scheduler. diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp index c4a32b06d..9cc628981 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp index ff76ea075..3bb98852a 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp @@ -1101,18 +1101,6 @@ win_tcp_service::native_handle() const noexcept return iocp_; } -inline LPFN_CONNECTEX -win_tcp_service::connect_ex() const noexcept -{ - return connect_ex_; -} - -inline LPFN_ACCEPTEX -win_tcp_service::accept_ex() const noexcept -{ - return accept_ex_; -} - inline void win_tcp_service::post(overlapped_op* op) { diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp index 15237606c..38e818503 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp @@ -152,10 +152,16 @@ class BOOST_COROSIO_DECL win_tcp_service final void* native_handle() const noexcept; /** Return the ConnectEx function pointer. */ - LPFN_CONNECTEX connect_ex() const noexcept; + LPFN_CONNECTEX connect_ex() const noexcept + { + return connect_ex_; + } /** Return the AcceptEx function pointer. */ - LPFN_ACCEPTEX accept_ex() const noexcept; + LPFN_ACCEPTEX accept_ex() const noexcept + { + return accept_ex_; + } /** Post an overlapped operation for completion. */ void post(overlapped_op* op); diff --git a/include/boost/corosio/native/native.hpp b/include/boost/corosio/native/native.hpp index 5c8b3aeae..a5b277e3c 100644 --- a/include/boost/corosio/native/native.hpp +++ b/include/boost/corosio/native/native.hpp @@ -19,10 +19,16 @@ #include #include +#include +#include +#include +#include #include #include +#include #include #include #include +#include #endif diff --git a/include/boost/corosio/native/native_local_datagram_socket.hpp b/include/boost/corosio/native/native_local_datagram_socket.hpp new file mode 100644 index 000000000..e603d6161 --- /dev/null +++ b/include/boost/corosio/native/native_local_datagram_socket.hpp @@ -0,0 +1,489 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_DATAGRAM_SOCKET_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_DATAGRAM_SOCKET_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL +#include +#endif + +#if BOOST_COROSIO_HAS_SELECT +#include +#endif + +#if BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** An asynchronous Unix datagram socket with devirtualized I/O. + + This class template inherits from @ref local_datagram_socket + and shadows the async operations (`send_to`, `recv_from`, + `connect`, `send`, `recv`) with versions that call the backend + implementation directly, allowing the compiler to inline + through the entire call chain. + + Non-async operations (`open`, `close`, `cancel`, `bind`, + socket options) remain unchanged and dispatch through the + compiled library. + + A `native_local_datagram_socket` IS-A `local_datagram_socket` + and can be passed to any function expecting + `local_datagram_socket&`, in which case virtual dispatch is + used transparently. + + @tparam Backend A backend tag value (e.g., `epoll`) whose type + provides the concrete implementation types. + + @par Thread Safety + Same as @ref local_datagram_socket. + + @par Example + @code + #include + + native_io_context ctx; + native_local_datagram_socket s(ctx); + s.open(); + s.bind(local_endpoint("/tmp/recv.sock")); + char buf[1024]; + local_endpoint sender; + auto [ec, n] = co_await s.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), sender); + @endcode + + @see local_datagram_socket, epoll_t, iocp_t +*/ +template +class native_local_datagram_socket : public local_datagram_socket +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::local_datagram_socket_type; + using service_type = typename backend_type::local_datagram_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_send_to_awaitable + { + native_local_datagram_socket& self_; + ConstBufferSequence buffers_; + corosio::local_endpoint dest_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_send_to_awaitable( + native_local_datagram_socket& self, + ConstBufferSequence buffers, + corosio::local_endpoint dest, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , dest_(dest) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().send_to( + h, env->executor, buffers_, dest_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_recv_from_awaitable + { + native_local_datagram_socket& self_; + MutableBufferSequence buffers_; + corosio::local_endpoint& source_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_recv_from_awaitable( + native_local_datagram_socket& self, + MutableBufferSequence buffers, + corosio::local_endpoint& source, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , source_(source) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().recv_from( + h, env->executor, buffers_, &source_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + struct native_wait_awaitable + { + native_local_datagram_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable( + native_local_datagram_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_connect_awaitable + { + native_local_datagram_socket& self_; + corosio::local_endpoint endpoint_; + std::stop_token token_; + mutable std::error_code ec_; + + native_connect_awaitable( + native_local_datagram_socket& self, + corosio::local_endpoint ep) noexcept + : self_(self) + , endpoint_(ep) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().connect( + h, env->executor, endpoint_, token_, &ec_); + } + }; + + template + struct native_send_awaitable + { + native_local_datagram_socket& self_; + ConstBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_send_awaitable( + native_local_datagram_socket& self, + ConstBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().send( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_recv_awaitable + { + native_local_datagram_socket& self_; + MutableBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_recv_awaitable( + native_local_datagram_socket& self, + MutableBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().recv( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + +public: + /** Construct a native socket from an execution context. + + @param ctx The execution context that will own this socket. + */ + explicit native_local_datagram_socket(capy::execution_context& ctx) + : local_datagram_socket(create_handle(ctx)) + { + } + + /** Construct a native socket from an executor. + + @param ex The executor whose context will own the socket. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_local_datagram_socket>) && + capy::Executor + explicit native_local_datagram_socket(Ex const& ex) + : native_local_datagram_socket(ex.context()) + { + } + + /// Move construct. + native_local_datagram_socket(native_local_datagram_socket&&) noexcept = + default; + + /// Move assign. + native_local_datagram_socket& + operator=(native_local_datagram_socket&&) noexcept = default; + + native_local_datagram_socket(native_local_datagram_socket const&) = delete; + native_local_datagram_socket& + operator=(native_local_datagram_socket const&) = delete; + + /** Send a datagram to the specified destination. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::send_to. + */ + template + auto send_to( + CB const& buffers, + corosio::local_endpoint dest, + corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("send_to: socket not open"); + return native_send_to_awaitable( + *this, buffers, dest, static_cast(flags)); + } + + /// @overload + template + auto send_to(CB const& buffers, corosio::local_endpoint dest) + { + return send_to(buffers, dest, corosio::message_flags::none); + } + + /** Receive a datagram and capture the sender's endpoint. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::recv_from. + */ + template + auto recv_from( + MB const& buffers, + corosio::local_endpoint& source, + corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("recv_from: socket not open"); + return native_recv_from_awaitable( + *this, buffers, source, static_cast(flags)); + } + + /// @overload + template + auto recv_from(MB const& buffers, corosio::local_endpoint& source) + { + return recv_from(buffers, source, corosio::message_flags::none); + } + + /** Asynchronously connect to set the default peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::connect. + + If the socket is not already open, it is opened automatically. + */ + auto connect(corosio::local_endpoint ep) + { + if (!is_open()) + open(); + return native_connect_awaitable(*this, ep); + } + + /** Send a datagram to the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::send. + */ + template + auto send(CB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("send: socket not open"); + return native_send_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto send(CB const& buffers) + { + return send(buffers, corosio::message_flags::none); + } + + /** Receive a datagram from the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::recv. + */ + template + auto recv(MB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("recv: socket not open"); + return native_recv_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto recv(MB const& buffers) + { + return recv(buffers, corosio::message_flags::none); + } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_datagram_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_DATAGRAM_SOCKET_HPP diff --git a/include/boost/corosio/native/native_local_stream_acceptor.hpp b/include/boost/corosio/native/native_local_stream_acceptor.hpp new file mode 100644 index 000000000..963ba3780 --- /dev/null +++ b/include/boost/corosio/native/native_local_stream_acceptor.hpp @@ -0,0 +1,287 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_ACCEPTOR_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_ACCEPTOR_HPP + +#include +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL +#include +#endif + +#if BOOST_COROSIO_HAS_SELECT +#include +#endif + +#if BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** An asynchronous Unix stream acceptor with devirtualized accept. + + This class template inherits from @ref local_stream_acceptor + and shadows both `accept` overloads (the peer-reference form + and the move-return form) with versions that call the backend + implementation directly, allowing the compiler to inline + through the entire call chain. The move-return form yields a + @ref native_local_stream_socket so subsequent I/O on the peer + is also devirtualized. + + Non-async operations (`listen`, `close`, `cancel`) remain + unchanged and dispatch through the compiled library. + + A `native_local_stream_acceptor` IS-A `local_stream_acceptor` + and can be passed to any function expecting + `local_stream_acceptor&`. + + @tparam Backend A backend tag value (e.g., `epoll`). + + @par Thread Safety + Same as @ref local_stream_acceptor. + + @see local_stream_acceptor, epoll_t, iocp_t +*/ +template +class native_local_stream_acceptor : public local_stream_acceptor +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::local_stream_acceptor_type; + using service_type = + typename backend_type::local_stream_acceptor_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + struct native_wait_awaitable + { + native_local_stream_acceptor& acc_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable( + native_local_stream_acceptor& acc, wait_type w) noexcept + : acc_(acc) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_accept_awaitable + { + native_local_stream_acceptor& acc_; + local_stream_socket& peer_; + std::stop_token token_; + mutable std::error_code ec_; + mutable io_object::implementation* peer_impl_ = nullptr; + + native_accept_awaitable( + native_local_stream_acceptor& acc, + local_stream_socket& peer) noexcept + : acc_(acc) + , peer_(peer) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + if (!ec_) + acc_.reset_peer_impl(peer_, peer_impl_); + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().accept( + h, env->executor, token_, &ec_, &peer_impl_); + } + }; + + struct native_move_accept_awaitable + { + native_local_stream_acceptor& acc_; + std::stop_token token_; + mutable std::error_code ec_; + mutable io_object::implementation* peer_impl_ = nullptr; + + explicit native_move_accept_awaitable( + native_local_stream_acceptor& acc) noexcept + : acc_(acc) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result> + await_resume() const noexcept + { + if (token_.stop_requested()) + return { + make_error_code(std::errc::operation_canceled), + native_local_stream_socket(acc_.context())}; + if (ec_ || !peer_impl_) + return { + ec_, + native_local_stream_socket(acc_.context())}; + + native_local_stream_socket peer(acc_.context()); + acc_.reset_peer_impl(peer, peer_impl_); + return {ec_, std::move(peer)}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().accept( + h, env->executor, token_, &ec_, &peer_impl_); + } + }; + +public: + /** Construct a native acceptor from an execution context. + + @param ctx The execution context that will own this acceptor. + */ + explicit native_local_stream_acceptor(capy::execution_context& ctx) + : local_stream_acceptor(create_handle(ctx), ctx) + { + } + + /** Construct a native acceptor from an executor. + + @param ex The executor whose context will own the acceptor. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_local_stream_acceptor>) && + capy::Executor + explicit native_local_stream_acceptor(Ex const& ex) + : native_local_stream_acceptor(ex.context()) + { + } + + /// Move construct. + native_local_stream_acceptor(native_local_stream_acceptor&&) noexcept = + default; + + /// Move assign. + native_local_stream_acceptor& + operator=(native_local_stream_acceptor&&) noexcept = default; + + native_local_stream_acceptor(native_local_stream_acceptor const&) = delete; + native_local_stream_acceptor& + operator=(native_local_stream_acceptor const&) = delete; + + /** Asynchronously accept an incoming connection. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_acceptor::accept. + + @param peer The socket to receive the accepted connection. + + @return An awaitable yielding `io_result<>`. + + @throws std::logic_error if the acceptor is not listening. + + Both this acceptor and @p peer must outlive the returned + awaitable. + */ + auto accept(local_stream_socket& peer) + { + if (!is_open()) + detail::throw_logic_error("accept: acceptor not listening"); + return native_accept_awaitable(*this, peer); + } + + /** Asynchronously accept an incoming connection, returning the peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. The accepted peer is returned as a + @ref native_local_stream_socket so that subsequent I/O on it + is also devirtualized. + + @return An awaitable yielding + `io_result>`. + + @throws std::logic_error if the acceptor is not listening. + + This acceptor must outlive the returned awaitable. + */ + auto accept() + { + if (!is_open()) + detail::throw_logic_error("accept: acceptor not listening"); + return native_move_accept_awaitable(*this); + } + + /** Asynchronously wait for the acceptor to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_acceptor::wait. + + @param w The wait direction (typically `wait_type::read`). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_ACCEPTOR_HPP diff --git a/include/boost/corosio/native/native_local_stream_socket.hpp b/include/boost/corosio/native/native_local_stream_socket.hpp new file mode 100644 index 000000000..9bf2eeaef --- /dev/null +++ b/include/boost/corosio/native/native_local_stream_socket.hpp @@ -0,0 +1,332 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL +#include +#endif + +#if BOOST_COROSIO_HAS_SELECT +#include +#endif + +#if BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** An asynchronous Unix stream socket with devirtualized I/O operations. + + This class template inherits from @ref local_stream_socket and + shadows the async operations (`read_some`, `write_some`, + `connect`) with versions that call the backend implementation + directly, allowing the compiler to inline through the entire + call chain. + + Non-async operations (`open`, `close`, `cancel`, socket options) + remain unchanged and dispatch through the compiled library. + + A `native_local_stream_socket` IS-A `local_stream_socket` and + can be passed to any function expecting `local_stream_socket&` + or `io_stream&`, in which case virtual dispatch is used + transparently. + + @tparam Backend A backend tag value (e.g., `epoll`) whose type + provides the concrete implementation types. + + @par Thread Safety + Same as @ref local_stream_socket. + + @par Example + @code + #include + + native_io_context ctx; + native_local_stream_socket s(ctx); + s.open(); + auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock")); + @endcode + + @see local_stream_socket, epoll_t, iocp_t +*/ +template +class native_local_stream_socket : public local_stream_socket +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::local_stream_socket_type; + using service_type = typename backend_type::local_stream_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_read_awaitable + { + native_local_stream_socket& self_; + MutableBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_read_awaitable( + native_local_stream_socket& self, + MutableBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().read_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_write_awaitable + { + native_local_stream_socket& self_; + ConstBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_write_awaitable( + native_local_stream_socket& self, + ConstBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().write_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + + struct native_wait_awaitable + { + native_local_stream_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable( + native_local_stream_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_connect_awaitable + { + native_local_stream_socket& self_; + corosio::local_endpoint endpoint_; + std::stop_token token_; + mutable std::error_code ec_; + + native_connect_awaitable( + native_local_stream_socket& self, + corosio::local_endpoint ep) noexcept + : self_(self) + , endpoint_(ep) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().connect( + h, env->executor, endpoint_, token_, &ec_); + } + }; + +public: + /** Construct a native socket from an execution context. + + @param ctx The execution context that will own this socket. + */ + explicit native_local_stream_socket(capy::execution_context& ctx) + : io_object(create_handle(ctx)) + { + } + + /** Construct a native socket from an executor. + + @param ex The executor whose context will own the socket. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_local_stream_socket>) && + capy::Executor + explicit native_local_stream_socket(Ex const& ex) + : native_local_stream_socket(ex.context()) + { + } + + /// Move construct. + native_local_stream_socket(native_local_stream_socket&&) noexcept = default; + + /// Move assign. + native_local_stream_socket& + operator=(native_local_stream_socket&&) noexcept = default; + + native_local_stream_socket(native_local_stream_socket const&) = delete; + native_local_stream_socket& + operator=(native_local_stream_socket const&) = delete; + + /** Asynchronously read data from the socket. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::read_some. + + @param buffers The buffer sequence to read into. + + @return An awaitable yielding `(error_code, std::size_t)`. + */ + template + auto read_some(MB const& buffers) + { + return native_read_awaitable(*this, buffers); + } + + /** Asynchronously write data to the socket. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::write_some. + + @param buffers The buffer sequence to write from. + + @return An awaitable yielding `(error_code, std::size_t)`. + */ + template + auto write_some(CB const& buffers) + { + return native_write_awaitable(*this, buffers); + } + + /** Asynchronously connect to a remote endpoint. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_socket::connect. + + If the socket is not already open, it is opened automatically. + + @param ep The local endpoint (path) to connect to. + + @return An awaitable yielding `io_result<>`. + + @throws std::system_error if the socket needs to be opened + and the open fails. + */ + auto connect(corosio::local_endpoint ep) + { + if (!is_open()) + open(); + return native_connect_awaitable(*this, ep); + } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref local_stream_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP diff --git a/include/boost/corosio/native/native_random_access_file.hpp b/include/boost/corosio/native/native_random_access_file.hpp new file mode 100644 index 000000000..33387dc05 --- /dev/null +++ b/include/boost/corosio/native/native_random_access_file.hpp @@ -0,0 +1,228 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_RANDOM_ACCESS_FILE_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_RANDOM_ACCESS_FILE_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_SELECT || \ + BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** A random-access file with devirtualized async I/O operations. + + This class template inherits from @ref random_access_file and + shadows `read_some_at` / `write_some_at` with versions that + call the backend implementation directly, allowing the compiler + to inline through the entire call chain. + + Non-async operations (`open`, `close`, `size`, `resize`, + `sync_data`, `sync_all`) remain unchanged and dispatch through + the compiled library. + + A `native_random_access_file` IS-A `random_access_file` and + can be passed to any function expecting `random_access_file&`, + in which case virtual dispatch is used transparently. + + @note On POSIX platforms, file I/O is dispatched to a thread + pool regardless of the chosen reactor backend, so all three + reactor tags (`epoll`, `select`, `kqueue`) resolve to the same + underlying implementation. The `Backend` template parameter + exists for API symmetry with @ref native_tcp_socket and friends. + The vtable savings are smaller relative to the thread-pool / + overlapped-I/O cost than they are for socket operations. + + @tparam Backend A backend tag value (e.g., `epoll`, `iocp`). + + @par Thread Safety + Same as @ref random_access_file. + + @par Example + @code + #include + + native_io_context ctx; + native_random_access_file f(ctx); + f.open("data.bin", file_base::read_only); + char buf[4096]; + auto [ec, n] = co_await f.read_some_at( + 0, capy::mutable_buffer(buf, sizeof(buf))); + @endcode + + @see random_access_file, epoll_t, iocp_t +*/ +template +class native_random_access_file : public random_access_file +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::random_access_file_type; + using service_type = + typename backend_type::random_access_file_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_read_at_awaitable + { + native_random_access_file& self_; + std::uint64_t offset_; + MutableBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_read_at_awaitable( + native_random_access_file& self, + std::uint64_t offset, + MutableBufferSequence buffers) noexcept + : self_(self) + , offset_(offset) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().read_some_at( + offset_, h, env->executor, buffers_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_write_at_awaitable + { + native_random_access_file& self_; + std::uint64_t offset_; + ConstBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_write_at_awaitable( + native_random_access_file& self, + std::uint64_t offset, + ConstBufferSequence buffers) noexcept + : self_(self) + , offset_(offset) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().write_some_at( + offset_, h, env->executor, buffers_, + token_, &ec_, &bytes_transferred_); + } + }; + +public: + /** Construct a native random-access file from an execution context. + + @param ctx The execution context that will own this file. + */ + explicit native_random_access_file(capy::execution_context& ctx) + : random_access_file(create_handle(ctx)) + { + } + + /** Construct a native random-access file from an executor. + + @param ex The executor whose context will own this file. + */ + template + requires(!std::same_as< + std::remove_cvref_t, + native_random_access_file>) && + capy::Executor + explicit native_random_access_file(Ex const& ex) + : native_random_access_file(ex.context()) + { + } + + /// Move construct. + native_random_access_file(native_random_access_file&&) noexcept = default; + + /// Move assign. + native_random_access_file& + operator=(native_random_access_file&&) noexcept = default; + + native_random_access_file(native_random_access_file const&) = delete; + native_random_access_file& + operator=(native_random_access_file const&) = delete; + + /** Asynchronously read at the given offset. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref random_access_file::read_some_at. + */ + template + auto read_some_at(std::uint64_t offset, MB const& buffers) + { + return native_read_at_awaitable(*this, offset, buffers); + } + + /** Asynchronously write at the given offset. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref random_access_file::write_some_at. + */ + template + auto write_some_at(std::uint64_t offset, CB const& buffers) + { + return native_write_at_awaitable(*this, offset, buffers); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_RANDOM_ACCESS_FILE_HPP diff --git a/include/boost/corosio/native/native_stream_file.hpp b/include/boost/corosio/native/native_stream_file.hpp new file mode 100644 index 000000000..ed1b15e18 --- /dev/null +++ b/include/boost/corosio/native/native_stream_file.hpp @@ -0,0 +1,214 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_NATIVE_STREAM_FILE_HPP +#define BOOST_COROSIO_NATIVE_NATIVE_STREAM_FILE_HPP + +#include +#include + +#ifndef BOOST_COROSIO_MRDOCS +#if BOOST_COROSIO_HAS_EPOLL || BOOST_COROSIO_HAS_SELECT || \ + BOOST_COROSIO_HAS_KQUEUE +#include +#endif + +#if BOOST_COROSIO_HAS_IOCP +#include +#endif +#endif // !BOOST_COROSIO_MRDOCS + +namespace boost::corosio { + +/** A sequential file with devirtualized async I/O operations. + + This class template inherits from @ref stream_file and shadows + `read_some` / `write_some` with versions that call the backend + implementation directly, allowing the compiler to inline through + the entire call chain. + + Non-async operations (`open`, `close`, `size`, `resize`, `seek`, + `sync_data`, `sync_all`) remain unchanged and dispatch through + the compiled library. + + A `native_stream_file` IS-A `stream_file` and can be passed to + any function expecting `stream_file&` or `io_stream&`, in which + case virtual dispatch is used transparently. + + @note On POSIX platforms, file I/O is dispatched to a thread + pool regardless of the chosen reactor backend, so all three + reactor tags (`epoll`, `select`, `kqueue`) resolve to the same + underlying implementation. The `Backend` template parameter + exists for API symmetry with @ref native_tcp_socket and friends. + The vtable savings are smaller relative to the thread-pool / + overlapped-I/O cost than they are for socket operations. + + @tparam Backend A backend tag value (e.g., `epoll`, `iocp`). + + @par Thread Safety + Same as @ref stream_file. + + @par Example + @code + #include + + native_io_context ctx; + native_stream_file f(ctx); + f.open("data.bin", file_base::read_only); + char buf[4096]; + auto [ec, n] = co_await f.read_some( + capy::mutable_buffer(buf, sizeof(buf))); + @endcode + + @see stream_file, epoll_t, iocp_t +*/ +template +class native_stream_file : public stream_file +{ + using backend_type = decltype(Backend); + using impl_type = typename backend_type::stream_file_type; + using service_type = typename backend_type::stream_file_service_type; + + impl_type& get_impl() noexcept + { + return *static_cast(h_.get()); + } + + template + struct native_read_awaitable + { + native_stream_file& self_; + MutableBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_read_awaitable( + native_stream_file& self, + MutableBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().read_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_write_awaitable + { + native_stream_file& self_; + ConstBufferSequence buffers_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_write_awaitable( + native_stream_file& self, + ConstBufferSequence buffers) noexcept + : self_(self) + , buffers_(std::move(buffers)) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().write_some( + h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); + } + }; + +public: + /** Construct a native stream file from an execution context. + + @param ctx The execution context that will own this file. + */ + explicit native_stream_file(capy::execution_context& ctx) + : io_object(create_handle(ctx)) + { + } + + /** Construct a native stream file from an executor. + + @param ex The executor whose context will own this file. + */ + template + requires(!std::same_as, native_stream_file>) && + capy::Executor + explicit native_stream_file(Ex const& ex) : native_stream_file(ex.context()) + { + } + + /// Move construct. + native_stream_file(native_stream_file&&) noexcept = default; + + /// Move assign. + native_stream_file& operator=(native_stream_file&&) noexcept = default; + + native_stream_file(native_stream_file const&) = delete; + native_stream_file& operator=(native_stream_file const&) = delete; + + /** Asynchronously read data from the file. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::read_some. + */ + template + auto read_some(MB const& buffers) + { + return native_read_awaitable(*this, buffers); + } + + /** Asynchronously write data to the file. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref io_stream::write_some. + */ + template + auto write_some(CB const& buffers) + { + return native_write_awaitable(*this, buffers); + } +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_NATIVE_NATIVE_STREAM_FILE_HPP diff --git a/include/boost/corosio/native/native_tcp_acceptor.hpp b/include/boost/corosio/native/native_tcp_acceptor.hpp index 7546d56f8..75835852b 100644 --- a/include/boost/corosio/native/native_tcp_acceptor.hpp +++ b/include/boost/corosio/native/native_tcp_acceptor.hpp @@ -65,6 +65,40 @@ class native_tcp_acceptor : public tcp_acceptor return *static_cast(h_.get()); } + struct native_wait_awaitable + { + native_tcp_acceptor& acc_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable(native_tcp_acceptor& acc, wait_type w) noexcept + : acc_(acc) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return acc_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + struct native_accept_awaitable { native_tcp_acceptor& acc_; @@ -169,6 +203,20 @@ class native_tcp_acceptor : public tcp_acceptor detail::throw_logic_error("accept: acceptor not listening"); return native_accept_awaitable(*this, peer); } + + /** Asynchronously wait for the acceptor to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref tcp_acceptor::wait. + + @param w The wait direction (typically `wait_type::read`). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } }; } // namespace boost::corosio diff --git a/include/boost/corosio/native/native_tcp_socket.hpp b/include/boost/corosio/native/native_tcp_socket.hpp index 03e1e2bf7..94686e996 100644 --- a/include/boost/corosio/native/native_tcp_socket.hpp +++ b/include/boost/corosio/native/native_tcp_socket.hpp @@ -153,6 +153,40 @@ class native_tcp_socket : public tcp_socket } }; + struct native_wait_awaitable + { + native_tcp_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable(native_tcp_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + struct native_connect_awaitable { native_tcp_socket& self_; @@ -293,6 +327,20 @@ class native_tcp_socket : public tcp_socket detail::throw_logic_error("connect: socket not open"); return native_connect_awaitable(*this, ep); } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref tcp_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } }; } // namespace boost::corosio diff --git a/include/boost/corosio/native/native_udp_socket.hpp b/include/boost/corosio/native/native_udp_socket.hpp index 805b463f2..c7148892c 100644 --- a/include/boost/corosio/native/native_udp_socket.hpp +++ b/include/boost/corosio/native/native_udp_socket.hpp @@ -36,9 +36,10 @@ namespace boost::corosio { /** An asynchronous UDP socket with devirtualized I/O operations. This class template inherits from @ref udp_socket and shadows - the async operations (`send_to`, `recv_from`) with versions - that call the backend implementation directly, allowing the - compiler to inline through the entire call chain. + the async operations (`send_to`, `recv_from`, `connect`, `send`, + `recv`) with versions that call the backend implementation + directly, allowing the compiler to inline through the entire + call chain. Non-async operations (`open`, `close`, `cancel`, `bind`, socket options) remain unchanged and dispatch through the @@ -172,6 +173,158 @@ class native_udp_socket : public udp_socket } }; + struct native_wait_awaitable + { + native_udp_socket& self_; + wait_type w_; + std::stop_token token_; + mutable std::error_code ec_; + + native_wait_awaitable(native_udp_socket& self, wait_type w) noexcept + : self_(self) + , w_(w) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().wait( + h, env->executor, w_, token_, &ec_); + } + }; + + struct native_connect_awaitable + { + native_udp_socket& self_; + endpoint endpoint_; + std::stop_token token_; + mutable std::error_code ec_; + + native_connect_awaitable(native_udp_socket& self, endpoint ep) noexcept + : self_(self) + , endpoint_(ep) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result<> await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled)}; + return {ec_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().connect( + h, env->executor, endpoint_, token_, &ec_); + } + }; + + template + struct native_send_awaitable + { + native_udp_socket& self_; + ConstBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_send_awaitable( + native_udp_socket& self, + ConstBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().send( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + + template + struct native_recv_awaitable + { + native_udp_socket& self_; + MutableBufferSequence buffers_; + int flags_; + std::stop_token token_; + mutable std::error_code ec_; + mutable std::size_t bytes_transferred_ = 0; + + native_recv_awaitable( + native_udp_socket& self, + MutableBufferSequence buffers, + int flags) noexcept + : self_(self) + , buffers_(std::move(buffers)) + , flags_(flags) + { + } + + bool await_ready() const noexcept + { + return token_.stop_requested(); + } + + capy::io_result await_resume() const noexcept + { + if (token_.stop_requested()) + return {make_error_code(std::errc::operation_canceled), 0}; + return {ec_, bytes_transferred_}; + } + + auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) + -> std::coroutine_handle<> + { + token_ = env->stop_token; + return self_.get_impl().recv( + h, env->executor, buffers_, flags_, + token_, &ec_, &bytes_transferred_); + } + }; + public: /** Construct a native UDP socket from an execution context. @@ -262,6 +415,98 @@ class native_udp_socket : public udp_socket { return recv_from(buffers, source, corosio::message_flags::none); } + + /** Asynchronously connect to set the default peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::connect. + + If the socket is not already open, it is opened automatically + using the address family of @p ep. + + @param ep The remote endpoint to connect to. + + @return An awaitable yielding `io_result<>`. + + @throws std::system_error if the socket needs to be opened + and the open fails. + */ + auto connect(endpoint ep) + { + if (!is_open()) + open(ep.is_v6() ? udp::v6() : udp::v4()); + return native_connect_awaitable(*this, ep); + } + + /** Send a datagram to the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::send. + + @param buffers The buffer sequence containing data to send. + @param flags Message flags. + + @return An awaitable yielding `(error_code, std::size_t)`. + + @throws std::logic_error if the socket is not open. + */ + template + auto send(CB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("send: socket not open"); + return native_send_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto send(CB const& buffers) + { + return send(buffers, corosio::message_flags::none); + } + + /** Receive a datagram from the connected peer. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::recv. + + @param buffers The buffer sequence to receive data into. + @param flags Message flags (e.g. message_flags::peek). + + @return An awaitable yielding `(error_code, std::size_t)`. + + @throws std::logic_error if the socket is not open. + */ + template + auto recv(MB const& buffers, corosio::message_flags flags) + { + if (!is_open()) + detail::throw_logic_error("recv: socket not open"); + return native_recv_awaitable( + *this, buffers, static_cast(flags)); + } + + /// @overload + template + auto recv(MB const& buffers) + { + return recv(buffers, corosio::message_flags::none); + } + + /** Asynchronously wait for the socket to be ready. + + Calls the backend implementation directly, bypassing virtual + dispatch. Otherwise identical to @ref udp_socket::wait. + + @param w The wait direction (read, write, or error). + + @return An awaitable yielding `io_result<>`. + */ + [[nodiscard]] auto wait(wait_type w) + { + return native_wait_awaitable(*this, w); + } }; } // namespace boost::corosio diff --git a/include/boost/corosio/random_access_file.hpp b/include/boost/corosio/random_access_file.hpp index cd7d6ea7a..8e77a2b9d 100644 --- a/include/boost/corosio/random_access_file.hpp +++ b/include/boost/corosio/random_access_file.hpp @@ -361,6 +361,10 @@ class BOOST_COROSIO_DECL random_access_file : public io_object */ void assign(native_handle_type handle); +protected: + /// Construct from a pre-built handle (for native_random_access_file). + explicit random_access_file(handle h) noexcept : io_object(std::move(h)) {} + private: inline implementation& get() const noexcept { diff --git a/include/boost/corosio/stream_file.hpp b/include/boost/corosio/stream_file.hpp index 89f88784a..292908ae2 100644 --- a/include/boost/corosio/stream_file.hpp +++ b/include/boost/corosio/stream_file.hpp @@ -252,6 +252,13 @@ class BOOST_COROSIO_DECL stream_file : public io_stream seek(std::int64_t offset, file_base::seek_basis origin = file_base::seek_set); +protected: + /// Default-construct (for derived types that initialize io_object directly). + stream_file() noexcept = default; + + /// Construct from a pre-built handle (for native_stream_file). + explicit stream_file(handle h) noexcept : io_object(std::move(h)) {} + private: inline implementation& get() const noexcept { diff --git a/include/boost/corosio/test/local_socket_pair.hpp b/include/boost/corosio/test/local_socket_pair.hpp new file mode 100644 index 000000000..9793009c7 --- /dev/null +++ b/include/boost/corosio/test/local_socket_pair.hpp @@ -0,0 +1,142 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_TEST_LOCAL_SOCKET_PAIR_HPP +#define BOOST_COROSIO_TEST_LOCAL_SOCKET_PAIR_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost::corosio::test { + +/** Create a connected pair of AF_UNIX stream sockets via bind+accept+connect. + + Unlike the library-side @ref make_local_stream_pair (POSIX-only, + socketpair-based), this helper drives the public acceptor API so + it can produce native template wrappers like + `native_local_stream_socket` — the path benchmarks need + to exercise the shadowed read_some/write_some/connect ops. + + @tparam Socket Concrete or native local stream socket type. + @tparam Acceptor Matching acceptor type. + + @param ctx I/O context backing both sockets. + + @return Connected pair `{accepted, connected}`. +*/ +template< + class Socket = local_stream_socket, + class Acceptor = local_stream_acceptor> +std::pair +make_local_stream_pair(io_context& ctx) +{ + namespace fs = std::filesystem; + + static std::random_device rd; + static std::mt19937_64 gen{rd()}; + + std::string path; + for (int attempt = 0; attempt < 16; ++attempt) + { + std::string name = "co_pair_"; + name += std::to_string(gen()); + auto candidate = fs::temp_directory_path() / name; + std::error_code ec; + if (fs::create_directory(candidate, ec)) + { + path = (candidate / "s").string(); + break; + } + } + if (path.empty()) + throw std::runtime_error("make_local_stream_pair: temp path failed"); + + auto ex = ctx.get_executor(); + + Acceptor acc(ctx); + acc.open(); + if (auto ec = acc.bind(local_endpoint(path))) + throw std::runtime_error( + "local_stream_pair bind failed: " + ec.message()); + if (auto ec = acc.listen()) + throw std::runtime_error( + "local_stream_pair listen failed: " + ec.message()); + + Socket s1(ctx); + Socket s2(ctx); + s2.open(); + + std::error_code accept_ec, connect_ec; + bool accept_done = false, connect_done = false; + + capy::run_async(ex)( + [](Acceptor& a, Socket& s, std::error_code& ec_out, + bool& done_out) -> capy::task<> { + auto [ec] = co_await a.accept(s); + ec_out = ec; + done_out = true; + }(acc, s1, accept_ec, accept_done)); + + capy::run_async(ex)( + [](Socket& s, local_endpoint ep, std::error_code& ec_out, + bool& done_out) -> capy::task<> { + auto [ec] = co_await s.connect(ep); + ec_out = ec; + done_out = true; + }(s2, local_endpoint(path), connect_ec, connect_done)); + + ctx.run(); + ctx.restart(); + + // The bind path on disk is no longer needed once accept/connect + // have rendezvoused; remove the file and its parent directory so + // repeated bench invocations don't accumulate cruft under /tmp. + std::error_code rm_ec; + fs::remove(fs::path(path), rm_ec); + fs::remove(fs::path(path).parent_path(), rm_ec); + + if (!accept_done || accept_ec) + { + std::fprintf( + stderr, "local_stream_pair: accept failed (done=%d, ec=%s)\n", + accept_done, accept_ec.message().c_str()); + acc.close(); + throw std::runtime_error("local_stream_pair accept failed"); + } + + if (!connect_done || connect_ec) + { + std::fprintf( + stderr, "local_stream_pair: connect failed (done=%d, ec=%s)\n", + connect_done, connect_ec.message().c_str()); + acc.close(); + s1.close(); + throw std::runtime_error("local_stream_pair connect failed"); + } + + acc.close(); + + return {std::move(s1), std::move(s2)}; +} + +} // namespace boost::corosio::test + +#endif diff --git a/perf/bench/corosio/local_socket_latency_bench.cpp b/perf/bench/corosio/local_socket_latency_bench.cpp index 8b548ddd9..c8d4a6594 100644 --- a/perf/bench/corosio/local_socket_latency_bench.cpp +++ b/perf/bench/corosio/local_socket_latency_bench.cpp @@ -15,7 +15,9 @@ #include #include -#include +#include +#include +#include #include #include #include @@ -36,8 +38,8 @@ namespace { template capy::task<> unix_pingpong_client_task( - corosio::local_stream_socket& client, - corosio::local_stream_socket& server, + corosio::native_local_stream_socket& client, + corosio::native_local_stream_socket& server, std::size_t message_size, bench::state& state) { @@ -76,11 +78,15 @@ template void bench_unix_pingpong_latency(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto message_size = static_cast(state.range(0)); state.counters["message_size"] = static_cast(message_size); corosio::native_io_context ioc; - auto [client, server] = corosio::make_local_stream_pair(ioc); + auto [client, server] = + corosio::test::make_local_stream_pair(ioc); capy::run_async(ioc.get_executor())( unix_pingpong_client_task(client, server, message_size, state)); @@ -104,20 +110,25 @@ template void bench_unix_concurrent_latency(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + int num_pairs = static_cast(state.range(0)); state.counters["num_pairs"] = num_pairs; corosio::native_io_context ioc; - std::vector clients; - std::vector servers; + std::vector clients; + std::vector servers; clients.reserve(num_pairs); servers.reserve(num_pairs); for (int i = 0; i < num_pairs; ++i) { - auto [c, s] = corosio::make_local_stream_pair(ioc); + auto [c, s] = + corosio::test::make_local_stream_pair( + ioc); clients.push_back(std::move(c)); servers.push_back(std::move(s)); } @@ -150,13 +161,17 @@ template void bench_unix_pingpong_latency_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto message_size = static_cast(state.range(0)); state.counters["message_size"] = static_cast(message_size); corosio::io_context_options opts; opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - auto [client, server] = corosio::make_local_stream_pair(ioc); + auto [client, server] = + corosio::test::make_local_stream_pair(ioc); capy::run_async(ioc.get_executor())( unix_pingpong_client_task(client, server, message_size, state)); @@ -180,6 +195,9 @@ template void bench_unix_concurrent_latency_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + int num_pairs = static_cast(state.range(0)); state.counters["num_pairs"] = num_pairs; @@ -187,15 +205,17 @@ bench_unix_concurrent_latency_lockless(bench::state& state) opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - std::vector clients; - std::vector servers; + std::vector clients; + std::vector servers; clients.reserve(num_pairs); servers.reserve(num_pairs); for (int i = 0; i < num_pairs; ++i) { - auto [c, s] = corosio::make_local_stream_pair(ioc); + auto [c, s] = + corosio::test::make_local_stream_pair( + ioc); clients.push_back(std::move(c)); servers.push_back(std::move(s)); } diff --git a/perf/bench/corosio/local_socket_throughput_bench.cpp b/perf/bench/corosio/local_socket_throughput_bench.cpp index f57555bd8..4bacffecc 100644 --- a/perf/bench/corosio/local_socket_throughput_bench.cpp +++ b/perf/bench/corosio/local_socket_throughput_bench.cpp @@ -15,7 +15,9 @@ #include #include -#include +#include +#include +#include #include #include #include @@ -35,11 +37,15 @@ template void bench_unix_throughput(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::native_io_context ioc; - auto [writer, reader] = corosio::make_local_stream_pair(ioc); + auto [writer, reader] = + corosio::test::make_local_stream_pair(ioc); std::vector write_buf(chunk_size, 'x'); std::vector read_buf(chunk_size); @@ -93,11 +99,15 @@ template void bench_unix_bidirectional_throughput(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::native_io_context ioc; - auto [sock1, sock2] = corosio::make_local_stream_pair(ioc); + auto [sock1, sock2] = + corosio::test::make_local_stream_pair(ioc); std::vector buf1(chunk_size, 'a'); std::vector buf2(chunk_size, 'b'); @@ -178,13 +188,17 @@ template void bench_unix_throughput_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::io_context_options opts; opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - auto [writer, reader] = corosio::make_local_stream_pair(ioc); + auto [writer, reader] = + corosio::test::make_local_stream_pair(ioc); std::vector write_buf(chunk_size, 'x'); std::vector read_buf(chunk_size); @@ -238,13 +252,17 @@ template void bench_unix_bidirectional_throughput_lockless(bench::state& state) { + using socket_type = corosio::native_local_stream_socket; + using acceptor_type = corosio::native_local_stream_acceptor; + auto chunk_size = static_cast(state.range(0)); state.counters["chunk_size"] = static_cast(chunk_size); corosio::io_context_options opts; opts.single_threaded = true; corosio::native_io_context ioc(opts, 1); - auto [sock1, sock2] = corosio::make_local_stream_pair(ioc); + auto [sock1, sock2] = + corosio::test::make_local_stream_pair(ioc); std::vector buf1(chunk_size, 'a'); std::vector buf2(chunk_size, 'b'); diff --git a/src/corosio/src/local_stream_acceptor.cpp b/src/corosio/src/local_stream_acceptor.cpp index 753521940..0378e58c9 100644 --- a/src/corosio/src/local_stream_acceptor.cpp +++ b/src/corosio/src/local_stream_acceptor.cpp @@ -16,6 +16,14 @@ #if BOOST_COROSIO_POSIX #include +#else +// Windows: AF_UNIX socket files are reparse points with tag +// IO_REPARSE_TAG_AF_UNIX. DeleteFileA reliably removes them; +// std::filesystem::remove via libstdc++/MS STL was observed to +// silently leave them in place on at least some Windows hosts, +// so call the Win32 API directly. +#define WIN32_LEAN_AND_MEAN +#include #endif namespace boost::corosio { @@ -51,22 +59,20 @@ local_stream_acceptor::bind(corosio::local_endpoint ep, bind_option opt) if (!is_open()) detail::throw_logic_error("bind: acceptor not open"); -#if BOOST_COROSIO_POSIX if (opt == bind_option::unlink_existing && !ep.empty() && !ep.is_abstract()) { - // Best-effort removal; ENOENT is fine. + // Best-effort removal; missing file is fine. auto p = ep.path(); - // path() is not null-terminated for the fixed buffer, - // so copy to a local array for unlink. char buf[local_endpoint::max_path_length + 1]; std::memcpy(buf, p.data(), p.size()); buf[p.size()] = '\0'; +#if BOOST_COROSIO_POSIX ::unlink(buf); - } #else - (void)opt; + ::DeleteFileA(buf); #endif + } auto& svc = static_cast(h_.service()); diff --git a/test/unit/local_datagram_socket.cpp b/test/unit/local_datagram_socket.cpp index b87341381..1f8076867 100644 --- a/test/unit/local_datagram_socket.cpp +++ b/test/unit/local_datagram_socket.cpp @@ -12,6 +12,10 @@ #include +// AF_UNIX SOCK_DGRAM is POSIX-only in practice. Windows added AF_UNIX +// in Win10 1803 but never SOCK_DGRAM over it, so WSASocket fails on +// the very first open() and every test in this file would throw. +// Keep the entire suite POSIX-gated until Windows kernel support lands. #if BOOST_COROSIO_POSIX #include @@ -23,36 +27,14 @@ #include #include -#include -#include - #include "context.hpp" +#include "local_temp.hpp" #include "test_suite.hpp" namespace boost::corosio { -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_test_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace +using test::make_temp_socket_path; +using test::cleanup_temp_socket; template struct local_datagram_socket_test @@ -145,7 +127,7 @@ struct local_datagram_socket_test auto ec = sock.bind(local_endpoint(path)); BOOST_TEST_EQ(!ec, true); - cleanup_path(path); + cleanup_temp_socket(path); } void testSendToRecvFrom() @@ -212,8 +194,8 @@ struct local_datagram_socket_test // Source endpoint should be the sender's bound path BOOST_TEST_EQ(source.path(), path1); - cleanup_path(path1); - cleanup_path(path2); + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); } void testBindFailure() @@ -520,8 +502,8 @@ struct local_datagram_socket_test BOOST_TEST_EQ(src1.path(), path1); BOOST_TEST_EQ(src2.path(), path1); - cleanup_path(path1); - cleanup_path(path2); + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); } void run() @@ -547,8 +529,4 @@ COROSIO_BACKEND_TESTS( } // namespace boost::corosio -#else // !BOOST_COROSIO_POSIX - -// Empty on non-POSIX platforms - -#endif +#endif // BOOST_COROSIO_POSIX diff --git a/test/unit/local_stream_socket.cpp b/test/unit/local_stream_socket.cpp index 8ee995255..0468e99aa 100644 --- a/test/unit/local_stream_socket.cpp +++ b/test/unit/local_stream_socket.cpp @@ -11,9 +11,6 @@ #include #include - -#if BOOST_COROSIO_POSIX - #include #include #include @@ -32,9 +29,12 @@ #include #include +#if BOOST_COROSIO_POSIX #include +#endif #include "context.hpp" +#include "local_temp.hpp" #include "test_suite.hpp" namespace boost::corosio { @@ -44,28 +44,8 @@ namespace boost::corosio { static_assert(capy::ReadStream); static_assert(capy::WriteStream); -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_test_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace +using test::make_temp_socket_path; +using test::cleanup_temp_socket; template struct local_stream_socket_test @@ -140,7 +120,7 @@ struct local_stream_socket_test ioc.run(); ioc.restart(); - cleanup_path(path); + cleanup_temp_socket(path); BOOST_TEST_EQ(accept_done, true); BOOST_TEST_EQ(!accept_ec, true); @@ -191,7 +171,7 @@ struct local_stream_socket_test ioc.run(); ioc.restart(); - cleanup_path(path); + cleanup_temp_socket(path); BOOST_TEST_EQ(accept_done, true); BOOST_TEST_EQ(!accept_ec, true); @@ -200,6 +180,8 @@ struct local_stream_socket_test BOOST_TEST_EQ(!connect_ec, true); } +#if BOOST_COROSIO_POSIX + // Uses make_local_stream_pair, which is socketpair-based and POSIX-only. void testReadWrite() { io_context ioc(Backend); @@ -255,6 +237,7 @@ struct local_stream_socket_test BOOST_TEST_EQ(s1.is_open(), true); BOOST_TEST_EQ(s2.is_open(), true); } +#endif // BOOST_COROSIO_POSIX void testUnlinkExisting() { @@ -286,7 +269,7 @@ struct local_stream_socket_test BOOST_TEST_EQ(!ec, true); } - cleanup_path(path); + cleanup_temp_socket(path); } void testUnlinkNonexistent() @@ -302,7 +285,7 @@ struct local_stream_socket_test local_endpoint(path), bind_option::unlink_existing); BOOST_TEST_EQ(!ec, true); - cleanup_path(path); + cleanup_temp_socket(path); } void testEndpointOrdering() @@ -342,16 +325,22 @@ struct local_stream_socket_test testMove(); testConnectAccept(); testMoveAccept(); +#if BOOST_COROSIO_POSIX testReadWrite(); testSocketPair(); +#endif testUnlinkExisting(); testUnlinkNonexistent(); testEndpointOrdering(); testEndpointStreamOutput(); +#if BOOST_COROSIO_POSIX testAvailable(); testRelease(); +#endif } +#if BOOST_COROSIO_POSIX + // Uses make_local_stream_pair, which is socketpair-based and POSIX-only. void testAvailable() { io_context ioc(Backend); @@ -378,7 +367,13 @@ struct local_stream_socket_test BOOST_TEST_EQ(done, true); BOOST_TEST_EQ(s2.available(), std::strlen(msg)); } +#endif // BOOST_COROSIO_POSIX +#if BOOST_COROSIO_POSIX + // Exercises raw POSIX fd ops (::write, ::close) on the released + // descriptor. The released-handle semantics are tested via the + // platform helpers; skipped on Windows because the analogous + // path needs send/closesocket and isn't yet factored. void testRelease() { io_context ioc(Backend); @@ -395,6 +390,7 @@ struct local_stream_socket_test BOOST_TEST_EQ(::write(fd, msg, std::strlen(msg)) > 0, true); ::close(fd); } +#endif void testEndpointStreamOutput() { @@ -429,9 +425,3 @@ COROSIO_BACKEND_TESTS( local_stream_socket_test, "boost.corosio.local_stream_socket") } // namespace boost::corosio - -#else // !BOOST_COROSIO_POSIX - -// Empty on non-POSIX platforms - -#endif diff --git a/test/unit/local_temp.hpp b/test/unit/local_temp.hpp new file mode 100644 index 000000000..cc4b4d1c1 --- /dev/null +++ b/test/unit/local_temp.hpp @@ -0,0 +1,68 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +/* Portable temporary path helpers for Unix-domain socket tests. + + Replaces the POSIX-only mkdtemp/unlink/rmdir pattern with + std::filesystem so tests can run on Windows (which supports + AF_UNIX since Windows 10 build 17061). + + Paths are kept short — AF_UNIX limits sun_path to ~108 bytes + on POSIX and ~108 bytes on Windows. The "co_t_" prefix and + hex random suffix keep the total well under that limit on + typical Windows installations. +*/ + +#ifndef BOOST_COROSIO_TEST_LOCAL_TEMP_HPP +#define BOOST_COROSIO_TEST_LOCAL_TEMP_HPP + +#include +#include +#include +#include +#include +#include + +namespace boost::corosio::test { + +inline std::string +make_temp_socket_path(std::string_view prefix = "co_t_") +{ + namespace fs = std::filesystem; + + static std::random_device rd; + static std::mt19937_64 gen{rd()}; + + for (int attempt = 0; attempt < 16; ++attempt) + { + std::string name(prefix); + name += std::to_string(gen()); + + auto dir = fs::temp_directory_path() / name; + + std::error_code ec; + if (fs::create_directory(dir, ec)) + return (dir / "s").string(); + } + throw std::runtime_error("failed to create temp socket directory"); +} + +inline void +cleanup_temp_socket(std::string const& path) noexcept +{ + namespace fs = std::filesystem; + std::error_code ec; + fs::path p(path); + fs::remove(p, ec); + fs::remove(p.parent_path(), ec); +} + +} // namespace boost::corosio::test + +#endif diff --git a/test/unit/native/native_local_datagram_socket.cpp b/test/unit/native/native_local_datagram_socket.cpp new file mode 100644 index 000000000..7dab48906 --- /dev/null +++ b/test/unit/native/native_local_datagram_socket.cpp @@ -0,0 +1,313 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include + +// AF_UNIX SOCK_DGRAM is POSIX-only in practice. Windows added AF_UNIX +// in Win10 1803 but never SOCK_DGRAM over it, so WSASocket fails on +// the very first open() and every test in this file would throw. +#if BOOST_COROSIO_POSIX + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "context.hpp" +#include "local_temp.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +using test::make_temp_socket_path; +using test::cleanup_temp_socket; + +template +struct native_local_datagram_socket_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .send_to( + std::declval(), + std::declval())), + decltype(std::declval().send_to( + std::declval(), + std::declval()))>, + "native_local_datagram_socket::send_to must shadow local_datagram_socket::send_to"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .recv_from( + std::declval(), + std::declval())), + decltype(std::declval().recv_from( + std::declval(), + std::declval()))>, + "native_local_datagram_socket::recv_from must shadow local_datagram_socket::recv_from"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .connect(std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_local_datagram_socket::connect must shadow local_datagram_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .send(std::declval())), + decltype(std::declval().send( + std::declval()))>, + "native_local_datagram_socket::send must shadow local_datagram_socket::send"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .recv(std::declval())), + decltype(std::declval().recv( + std::declval()))>, + "native_local_datagram_socket::recv must shadow local_datagram_socket::recv"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .wait(wait_type::read)), + decltype(std::declval().wait( + wait_type::read))>, + "native_local_datagram_socket::wait must shadow local_datagram_socket::wait"); + + void testConstruct() + { + io_context ioc(Backend); + native_local_datagram_socket s(ioc); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testOpen() + { + io_context ioc(Backend); + native_local_datagram_socket s(ioc); + s.open(); + BOOST_TEST(s.is_open()); + s.close(); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + native_local_datagram_socket s(ioc); + s.open(); + local_datagram_socket& base = s; + BOOST_TEST(base.is_open()); + } + + void testSendToRecvFrom() + { + io_context ioc(Backend); + auto path1 = make_temp_socket_path(); + auto path2 = make_temp_socket_path(); + + native_local_datagram_socket sender(ioc); + native_local_datagram_socket receiver(ioc); + sender.open(); + receiver.open(); + + auto ec1 = sender.bind(local_endpoint(path1)); + auto ec2 = receiver.bind(local_endpoint(path2)); + BOOST_TEST_EQ(ec1, std::error_code{}); + BOOST_TEST_EQ(ec2, std::error_code{}); + + auto task = + [](native_local_datagram_socket& s, + native_local_datagram_socket& r, + local_endpoint dest) -> capy::task<> { + char const msg[] = "native dgram"; + auto [sec, sn] = + co_await s.send_to(capy::const_buffer(msg, sizeof(msg)), dest); + BOOST_TEST_EQ(sec, std::error_code{}); + BOOST_TEST_EQ(sn, sizeof(msg)); + + char buf[64] = {}; + local_endpoint source; + auto [rec, rn] = co_await r.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), source); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(rn, sizeof(msg)); + BOOST_TEST_EQ(std::strcmp(buf, "native dgram"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(sender, receiver, local_endpoint(path2))); + ioc.run(); + + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); + } + + void testSendRecvConnected() + { + io_context ioc(Backend); + auto path_a = make_temp_socket_path(); + auto path_b = make_temp_socket_path(); + + native_local_datagram_socket a(ioc); + native_local_datagram_socket b(ioc); + a.open(); + b.open(); + + auto eca = a.bind(local_endpoint(path_a)); + auto ecb = b.bind(local_endpoint(path_b)); + BOOST_TEST_EQ(eca, std::error_code{}); + BOOST_TEST_EQ(ecb, std::error_code{}); + + auto task = + [](native_local_datagram_socket& a, + native_local_datagram_socket& b, + local_endpoint a_to_b, + local_endpoint b_to_a) -> capy::task<> { + auto [ec1] = co_await a.connect(a_to_b); + BOOST_TEST_EQ(ec1, std::error_code{}); + auto [ec2] = co_await b.connect(b_to_a); + BOOST_TEST_EQ(ec2, std::error_code{}); + + char const msg[] = "connected dgram"; + auto [sec, sn] = + co_await a.send(capy::const_buffer(msg, sizeof(msg))); + BOOST_TEST_EQ(sec, std::error_code{}); + BOOST_TEST_EQ(sn, sizeof(msg)); + + char buf[64] = {}; + auto [rec, rn] = + co_await b.recv(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(rn, sizeof(msg)); + BOOST_TEST_EQ(std::strcmp(buf, "connected dgram"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task( + a, b, local_endpoint(path_b), local_endpoint(path_a))); + ioc.run(); + + cleanup_temp_socket(path_a); + cleanup_temp_socket(path_b); + } + + void testVirtualDispatchFallback() + { + io_context ioc(Backend); + auto path1 = make_temp_socket_path(); + auto path2 = make_temp_socket_path(); + + native_local_datagram_socket sender(ioc); + native_local_datagram_socket receiver(ioc); + sender.open(); + receiver.open(); + + auto ec1 = sender.bind(local_endpoint(path1)); + auto ec2 = receiver.bind(local_endpoint(path2)); + BOOST_TEST_EQ(ec1, std::error_code{}); + BOOST_TEST_EQ(ec2, std::error_code{}); + + local_datagram_socket& s_ref = sender; + local_datagram_socket& r_ref = receiver; + + auto task = [](local_datagram_socket& s, local_datagram_socket& r, + local_endpoint dest) -> capy::task<> { + char const msg[] = "virtual"; + auto [sec, sn] = + co_await s.send_to(capy::const_buffer(msg, sizeof(msg)), dest); + BOOST_TEST_EQ(sec, std::error_code{}); + + char buf[64] = {}; + local_endpoint source; + auto [rec, rn] = co_await r.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), source); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::strcmp(buf, "virtual"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(s_ref, r_ref, local_endpoint(path2))); + ioc.run(); + + cleanup_temp_socket(path1); + cleanup_temp_socket(path2); + } + + // Exercise the shadowed wait() awaitable: wait_type::read on a + // bound datagram socket resolves when a datagram arrives. + void testWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto rx_path = test::make_temp_socket_path(); + + native_local_datagram_socket recv(ioc); + recv.open(); + auto bec = recv.bind(local_endpoint(rx_path)); + BOOST_TEST(!bec); + + native_local_datagram_socket send(ioc); + send.open(); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await recv.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto sender = [&]() -> capy::task<> { + char dg[1] = {'X'}; + auto [ec, n] = co_await send.send_to( + capy::const_buffer(dg, sizeof(dg)), + local_endpoint(rx_path)); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(sender()); + ioc.run(); + + test::cleanup_temp_socket(rx_path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + void run() + { + testConstruct(); + testOpen(); + testPolymorphicSlice(); + testSendToRecvFrom(); + testSendRecvConnected(); + testVirtualDispatchFallback(); + testWait(); + } +}; + +COROSIO_BACKEND_TESTS( + native_local_datagram_socket_test, + "boost.corosio.native.local_datagram_socket") + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_POSIX diff --git a/test/unit/native/native_local_stream_socket.cpp b/test/unit/native/native_local_stream_socket.cpp new file mode 100644 index 000000000..0db9a2166 --- /dev/null +++ b/test/unit/native/native_local_stream_socket.cpp @@ -0,0 +1,359 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include "context.hpp" +#include "local_temp.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +using test::make_temp_socket_path; +using test::cleanup_temp_socket; + +template +struct native_local_stream_socket_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .read_some(std::declval())), + decltype(std::declval().read_some( + std::declval()))>, + "native_local_stream_socket::read_some must shadow io_stream::read_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .write_some(std::declval())), + decltype(std::declval().write_some( + std::declval()))>, + "native_local_stream_socket::write_some must shadow io_stream::write_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .connect(std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_local_stream_socket::connect must shadow local_stream_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .accept(std::declval())), + decltype(std::declval().accept( + std::declval()))>, + "native_local_stream_acceptor::accept(peer) must shadow local_stream_acceptor::accept(peer)"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .accept()), + decltype(std::declval().accept())>, + "native_local_stream_acceptor::accept() must shadow local_stream_acceptor::accept()"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait( + wait_type::read))>, + "native_local_stream_socket::wait must shadow local_stream_socket::wait"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .wait(wait_type::read)), + decltype(std::declval().wait( + wait_type::read))>, + "native_local_stream_acceptor::wait must shadow local_stream_acceptor::wait"); + + void testConstruct() + { + io_context ioc(Backend); + native_local_stream_socket s(ioc); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testOpen() + { + io_context ioc(Backend); + native_local_stream_socket s(ioc); + s.open(); + BOOST_TEST(s.is_open()); + s.close(); + BOOST_TEST_EQ(s.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + native_local_stream_socket s(ioc); + s.open(); + local_stream_socket& base = s; + BOOST_TEST(base.is_open()); + } + + void testConnectAcceptReadWrite() + { + io_context ioc(Backend); + auto path = make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto ec = acc.bind(local_endpoint(path)); + BOOST_TEST_EQ(ec, std::error_code{}); + ec = acc.listen(); + BOOST_TEST_EQ(ec, std::error_code{}); + + native_local_stream_socket server(ioc); + native_local_stream_socket client(ioc); + + auto acceptor_task = + [](native_local_stream_acceptor& a, + native_local_stream_socket& s) -> capy::task<> { + auto [ec] = co_await a.accept(s); + BOOST_TEST_EQ(ec, std::error_code{}); + + char buf[64] = {}; + auto [rec, n] = + co_await s.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::string(buf, n), std::string("native unix")); + }; + + auto client_task = [](native_local_stream_socket& c, + local_endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + BOOST_TEST_EQ(ec, std::error_code{}); + + char const msg[] = "native unix"; + auto [wec, n] = + co_await c.write_some(capy::const_buffer(msg, sizeof(msg) - 1)); + BOOST_TEST_EQ(wec, std::error_code{}); + BOOST_TEST_EQ(n, sizeof(msg) - 1); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(acceptor_task(acc, server)); + capy::run_async(ex)(client_task(client, local_endpoint(path))); + ioc.run(); + + cleanup_temp_socket(path); + } + + void testMoveAccept() + { + io_context ioc(Backend); + auto path = make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto ec = acc.bind(local_endpoint(path)); + BOOST_TEST_EQ(ec, std::error_code{}); + ec = acc.listen(); + BOOST_TEST_EQ(ec, std::error_code{}); + + native_local_stream_socket client(ioc); + + auto acceptor_task = + [](native_local_stream_acceptor& a) -> capy::task<> { + auto [ec, peer] = co_await a.accept(); + BOOST_TEST_EQ(ec, std::error_code{}); + BOOST_TEST(peer.is_open()); + + char buf[64] = {}; + auto [rec, n] = + co_await peer.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::string(buf, n), std::string("move accept")); + }; + + auto client_task = [](native_local_stream_socket& c, + local_endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + BOOST_TEST_EQ(ec, std::error_code{}); + + char const msg[] = "move accept"; + auto [wec, n] = + co_await c.write_some(capy::const_buffer(msg, sizeof(msg) - 1)); + BOOST_TEST_EQ(wec, std::error_code{}); + BOOST_TEST_EQ(n, sizeof(msg) - 1); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(acceptor_task(acc)); + capy::run_async(ex)(client_task(client, local_endpoint(path))); + ioc.run(); + + cleanup_temp_socket(path); + } + + void testVirtualDispatchFallback() + { + io_context ioc(Backend); + auto path = make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto ec = acc.bind(local_endpoint(path)); + BOOST_TEST_EQ(ec, std::error_code{}); + ec = acc.listen(); + BOOST_TEST_EQ(ec, std::error_code{}); + + native_local_stream_socket server(ioc); + native_local_stream_socket client(ioc); + + local_stream_acceptor& acc_ref = acc; + local_stream_socket& server_ref = server; + local_stream_socket& client_ref = client; + + auto acceptor_task = [](local_stream_acceptor& a, + local_stream_socket& s) -> capy::task<> { + auto [ec] = co_await a.accept(s); + BOOST_TEST_EQ(ec, std::error_code{}); + + char buf[64] = {}; + auto [rec, n] = + co_await s.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(rec, std::error_code{}); + BOOST_TEST_EQ(std::string(buf, n), std::string("virtual")); + }; + + auto client_task = [](local_stream_socket& c, + local_endpoint ep) -> capy::task<> { + auto [ec] = co_await c.connect(ep); + BOOST_TEST_EQ(ec, std::error_code{}); + + char const msg[] = "virtual"; + (void)co_await c.write_some( + capy::const_buffer(msg, sizeof(msg) - 1)); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(acceptor_task(acc_ref, server_ref)); + capy::run_async(ex)(client_task(client_ref, local_endpoint(path))); + ioc.run(); + + cleanup_temp_socket(path); + } + + // Exercise the shadowed wait() awaitable on the socket: a + // connected stream is always writable, so wait_type::write + // resolves immediately on every backend. + void testSocketWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto path = test::make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto bec = acc.bind(local_endpoint(path)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + + native_local_stream_socket server(ioc); + native_local_stream_socket client(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto rendezvous = [&]() -> capy::task<> { + auto [ec] = co_await acc.accept(server); + (void)ec; + }; + auto connect_task = [&]() -> capy::task<> { + auto [ec] = co_await client.connect(local_endpoint(path)); + (void)ec; + }; + capy::run_async(ex)(rendezvous()); + capy::run_async(ex)(connect_task()); + ioc.run(); + ioc.restart(); + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await client.wait(wait_type::write); + wait_ec = ec; + wait_done = true; + }; + capy::run_async(ex)(waiter()); + ioc.run(); + + test::cleanup_temp_socket(path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + // Exercise the shadowed wait() awaitable on the acceptor: + // wait_type::read resolves once a client connects. + void testAcceptorWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto path = test::make_temp_socket_path(); + + native_local_stream_acceptor acc(ioc); + acc.open(); + auto bec = acc.bind(local_endpoint(path)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + + native_local_stream_socket client(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await acc.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto connect_task = [&]() -> capy::task<> { + auto [ec] = co_await client.connect(local_endpoint(path)); + (void)ec; + }; + capy::run_async(ex)(waiter()); + capy::run_async(ex)(connect_task()); + ioc.run(); + + test::cleanup_temp_socket(path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + void run() + { + testConstruct(); + testOpen(); + testPolymorphicSlice(); + testConnectAcceptReadWrite(); + testMoveAccept(); + testVirtualDispatchFallback(); + testSocketWait(); + testAcceptorWait(); + } +}; + +COROSIO_BACKEND_TESTS( + native_local_stream_socket_test, "boost.corosio.native.local_stream_socket") + +} // namespace boost::corosio diff --git a/test/unit/native/native_random_access_file.cpp b/test/unit/native/native_random_access_file.cpp new file mode 100644 index 000000000..4b45ee32b --- /dev/null +++ b/test/unit/native/native_random_access_file.cpp @@ -0,0 +1,205 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "context.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +namespace { + +struct temp_file +{ + std::filesystem::path path; + + explicit temp_file(std::string_view prefix = "corosio_native_raf_") + { + // Per-process random_device-seeded RNG so concurrent test + // processes (e.g. ctest --parallel running .epoll and .select + // variants of the same suite) don't collide on identical paths. + static thread_local std::mt19937_64 gen{std::random_device{}()}; + path = std::filesystem::temp_directory_path() + / (std::string(prefix) + std::to_string(gen())); + } + + temp_file(std::string_view prefix, std::string_view contents) + : temp_file(prefix) + { + std::ofstream ofs(path, std::ios::binary); + ofs.write( + contents.data(), + static_cast(contents.size())); + } + + ~temp_file() + { + std::error_code ec; + std::filesystem::remove(path, ec); + } + + temp_file(temp_file const&) = delete; + temp_file& operator=(temp_file const&) = delete; +}; + +} // namespace + +template +struct native_random_access_file_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .read_some_at( + std::uint64_t{0}, + std::declval())), + decltype(std::declval().read_some_at( + std::uint64_t{0}, + std::declval()))>, + "native_random_access_file::read_some_at must shadow random_access_file::read_some_at"); + static_assert( + !std::is_same_v< + decltype(std::declval&>() + .write_some_at( + std::uint64_t{0}, + std::declval())), + decltype(std::declval().write_some_at( + std::uint64_t{0}, + std::declval()))>, + "native_random_access_file::write_some_at must shadow random_access_file::write_some_at"); + + void testConstruct() + { + io_context ioc(Backend); + native_random_access_file f(ioc); + BOOST_TEST_EQ(f.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + temp_file tmp("native_raf_slice_", "x"); + native_random_access_file f(ioc); + f.open(tmp.path, file_base::read_only); + + random_access_file& base = f; + BOOST_TEST(base.is_open()); + } + + void testReadSomeAt() + { + std::string data = "ABCDEFGHIJ"; + temp_file tmp("native_raf_read_", data); + + io_context ioc(Backend); + native_random_access_file f(ioc); + f.open(tmp.path, file_base::read_only); + + char buf[5] = {}; + std::size_t n_out = 0; + + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await f.read_some_at( + 3, capy::mutable_buffer(buf, 5)); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, 5u); + BOOST_TEST_EQ(std::memcmp(buf, "DEFGH", 5), 0); + } + + void testWriteSomeAt() + { + temp_file tmp("native_raf_write_"); + + io_context ioc(Backend); + native_random_access_file f(ioc); + f.open( + tmp.path, + file_base::read_write | file_base::create | file_base::truncate); + + std::size_t written = 0; + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await f.write_some_at( + 0, capy::const_buffer("hello", 5)); + BOOST_TEST_EQ(ec, std::error_code{}); + written = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(written, 5u); + + f.close(); + std::ifstream ifs(tmp.path, std::ios::binary); + std::string contents( + (std::istreambuf_iterator(ifs)), + std::istreambuf_iterator()); + BOOST_TEST_EQ(contents, std::string("hello")); + } + + void testVirtualDispatchFallback() + { + std::string data = "fallback"; + temp_file tmp("native_raf_fb_", data); + + io_context ioc(Backend); + native_random_access_file f(ioc); + f.open(tmp.path, file_base::read_only); + + random_access_file& base = f; + + char buf[64] = {}; + std::size_t n_out = 0; + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await base.read_some_at( + 0, capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, data.size()); + BOOST_TEST_EQ(std::memcmp(buf, data.data(), data.size()), 0); + } + + void run() + { + testConstruct(); + testPolymorphicSlice(); + testReadSomeAt(); + testWriteSomeAt(); + testVirtualDispatchFallback(); + } +}; + +COROSIO_BACKEND_TESTS( + native_random_access_file_test, + "boost.corosio.native.random_access_file") + +} // namespace boost::corosio diff --git a/test/unit/native/native_resolver.cpp b/test/unit/native/native_resolver.cpp index 51a7789c9..4969357c8 100644 --- a/test/unit/native/native_resolver.cpp +++ b/test/unit/native/native_resolver.cpp @@ -13,6 +13,10 @@ #include #include +#include +#include +#include + #include "context.hpp" #include "test_suite.hpp" @@ -21,6 +25,25 @@ namespace boost::corosio { template struct native_resolver_test { + // resolve(host, service) - forward resolution + static_assert( + !std::is_same_v< + decltype(std::declval&>().resolve( + std::declval(), + std::declval())), + decltype(std::declval().resolve( + std::declval(), + std::declval()))>, + "native_resolver::resolve(host, service) must shadow resolver::resolve"); + // resolve(endpoint) - reverse resolution + static_assert( + !std::is_same_v< + decltype(std::declval&>().resolve( + std::declval())), + decltype(std::declval().resolve( + std::declval()))>, + "native_resolver::resolve(endpoint) must shadow resolver::resolve"); + void testResolverConstruct() { io_context ctx(Backend); diff --git a/test/unit/native/native_signal_set.cpp b/test/unit/native/native_signal_set.cpp index 269c1e30d..2a32b7335 100644 --- a/test/unit/native/native_signal_set.cpp +++ b/test/unit/native/native_signal_set.cpp @@ -11,6 +11,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -20,6 +22,12 @@ namespace boost::corosio { template struct native_signal_set_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait()), + decltype(std::declval().wait())>, + "native_signal_set::wait must shadow io_signal_set::wait"); + void testSignalSetConstruct() { io_context ctx(Backend); diff --git a/test/unit/native/native_stream_file.cpp b/test/unit/native/native_stream_file.cpp new file mode 100644 index 000000000..fc9771137 --- /dev/null +++ b/test/unit/native/native_stream_file.cpp @@ -0,0 +1,199 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "context.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +namespace { + +struct temp_file +{ + std::filesystem::path path; + + explicit temp_file(std::string_view prefix = "corosio_native_sf_") + { + // Per-process random_device-seeded RNG so concurrent test + // processes (e.g. ctest --parallel running .epoll and .select + // variants of the same suite) don't collide on identical paths. + static thread_local std::mt19937_64 gen{std::random_device{}()}; + path = std::filesystem::temp_directory_path() + / (std::string(prefix) + std::to_string(gen())); + } + + temp_file(std::string_view prefix, std::string_view contents) + : temp_file(prefix) + { + std::ofstream ofs(path, std::ios::binary); + ofs.write( + contents.data(), + static_cast(contents.size())); + } + + ~temp_file() + { + std::error_code ec; + std::filesystem::remove(path, ec); + } + + temp_file(temp_file const&) = delete; + temp_file& operator=(temp_file const&) = delete; +}; + +} // namespace + +template +struct native_stream_file_test +{ + static_assert( + !std::is_same_v< + decltype(std::declval&>().read_some( + std::declval())), + decltype(std::declval().read_some( + std::declval()))>, + "native_stream_file::read_some must shadow io_stream::read_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().write_some( + std::declval())), + decltype(std::declval().write_some( + std::declval()))>, + "native_stream_file::write_some must shadow io_stream::write_some"); + + void testConstruct() + { + io_context ioc(Backend); + native_stream_file f(ioc); + BOOST_TEST_EQ(f.is_open(), false); + } + + void testPolymorphicSlice() + { + io_context ioc(Backend); + temp_file tmp("native_sf_slice_", "x"); + native_stream_file f(ioc); + f.open(tmp.path, file_base::read_only); + + stream_file& base = f; + BOOST_TEST(base.is_open()); + } + + void testReadSome() + { + std::string data = "hello native"; + temp_file tmp("native_sf_read_", data); + + io_context ioc(Backend); + native_stream_file f(ioc); + f.open(tmp.path, file_base::read_only); + + char buf[64] = {}; + std::size_t n_out = 0; + + auto task = [&]() -> capy::task<> { + auto [ec, n] = + co_await f.read_some(capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, data.size()); + BOOST_TEST_EQ(std::memcmp(buf, data.data(), data.size()), 0); + } + + void testWriteSome() + { + temp_file tmp("native_sf_write_"); + + io_context ioc(Backend); + native_stream_file f(ioc); + f.open( + tmp.path, + file_base::write_only | file_base::create | file_base::truncate); + + char const msg[] = "native write"; + std::size_t written = 0; + + auto task = [&]() -> capy::task<> { + auto [ec, n] = + co_await f.write_some(capy::const_buffer(msg, sizeof(msg) - 1)); + BOOST_TEST_EQ(ec, std::error_code{}); + written = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(written, sizeof(msg) - 1); + + f.close(); + std::ifstream ifs(tmp.path, std::ios::binary); + std::string contents( + (std::istreambuf_iterator(ifs)), + std::istreambuf_iterator()); + BOOST_TEST_EQ(contents, std::string(msg)); + } + + void testVirtualDispatchFallback() + { + std::string data = "fallback"; + temp_file tmp("native_sf_fb_", data); + + io_context ioc(Backend); + native_stream_file f(ioc); + f.open(tmp.path, file_base::read_only); + + stream_file& base = f; + + char buf[64] = {}; + std::size_t n_out = 0; + auto task = [&]() -> capy::task<> { + auto [ec, n] = co_await base.read_some( + capy::mutable_buffer(buf, sizeof(buf))); + BOOST_TEST_EQ(ec, std::error_code{}); + n_out = n; + }; + capy::run_async(ioc.get_executor())(task()); + ioc.run(); + + BOOST_TEST_EQ(n_out, data.size()); + } + + void run() + { + testConstruct(); + testPolymorphicSlice(); + testReadSome(); + testWriteSome(); + testVirtualDispatchFallback(); + } +}; + +COROSIO_BACKEND_TESTS( + native_stream_file_test, "boost.corosio.native.stream_file") + +} // namespace boost::corosio diff --git a/test/unit/native/native_tcp_acceptor.cpp b/test/unit/native/native_tcp_acceptor.cpp index 898d0f676..5c361a880 100644 --- a/test/unit/native/native_tcp_acceptor.cpp +++ b/test/unit/native/native_tcp_acceptor.cpp @@ -8,9 +8,17 @@ // #include +#include #include #include +#include +#include + +#include +#include +#include + #include "context.hpp" #include "test_suite.hpp" @@ -19,6 +27,20 @@ namespace boost::corosio { template struct native_tcp_acceptor_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().accept( + std::declval())), + decltype(std::declval().accept( + std::declval()))>, + "native_tcp_acceptor::accept must shadow tcp_acceptor::accept"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait(wait_type::read))>, + "native_tcp_acceptor::wait must shadow tcp_acceptor::wait"); + void testAcceptorConstruct() { io_context ctx(Backend); @@ -57,11 +79,53 @@ struct native_tcp_acceptor_test BOOST_TEST(base.is_open()); } + // Exercise the shadowed wait() awaitable: wait_type::read on a + // listening acceptor resolves when a connection arrives. + void testWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + native_tcp_acceptor acc(ioc); + acc.open(); + acc.set_option(native_socket_option::reuse_address(true)); + auto bec = acc.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + auto port = acc.local_endpoint().port(); + + native_tcp_socket client(ioc); + client.open(); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await acc.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto connector = [&]() -> capy::task<> { + auto [ec] = co_await client.connect( + endpoint(ipv4_address::loopback(), port)); + (void)ec; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(connector()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + void run() { testAcceptorConstruct(); testAcceptorMoveConstruct(); testAcceptorPolymorphicSlice(); + testWait(); } }; diff --git a/test/unit/native/native_tcp_socket.cpp b/test/unit/native/native_tcp_socket.cpp index 2aeda892b..fa188433c 100644 --- a/test/unit/native/native_tcp_socket.cpp +++ b/test/unit/native/native_tcp_socket.cpp @@ -8,7 +8,17 @@ // #include +#include #include +#include + +#include +#include +#include + +#include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -18,6 +28,38 @@ namespace boost::corosio { template struct native_tcp_socket_test { + // Shadow-engagement checks: the native overload must return a + // distinct awaitable type from the base. If a shadow is broken + // (e.g. signature drift, missing override), these fail at compile + // time. The check is unevaluated; no runtime cost. + static_assert( + !std::is_same_v< + decltype(std::declval&>().read_some( + std::declval())), + decltype(std::declval().read_some( + std::declval()))>, + "native_tcp_socket::read_some must shadow io_stream::read_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().write_some( + std::declval())), + decltype(std::declval().write_some( + std::declval()))>, + "native_tcp_socket::write_some must shadow io_stream::write_some"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().connect( + std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_tcp_socket::connect must shadow tcp_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait(wait_type::read))>, + "native_tcp_socket::wait must shadow tcp_socket::wait"); + void testSocketConstruct() { io_context ctx(Backend); @@ -57,11 +99,37 @@ struct native_tcp_socket_test BOOST_TEST_PASS(); } + // Exercise the shadowed wait() awaitable. On a connected socket + // wait_type::write resolves immediately on every backend (IOCP + // matches asio's "writable is always ready" semantics). + void testWait() + { + io_context ioc(Backend); + auto [s1, s2] = test::make_socket_pair< + native_tcp_socket, + native_tcp_acceptor>(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await s1.wait(wait_type::write); + wait_ec = ec; + wait_done = true; + }; + capy::run_async(ioc.get_executor())(waiter()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + void run() { testSocketConstruct(); testSocketMoveConstruct(); testSocketPolymorphicSlice(); + testWait(); } }; diff --git a/test/unit/native/native_timer.cpp b/test/unit/native/native_timer.cpp index 13bd214cd..e0e29387d 100644 --- a/test/unit/native/native_timer.cpp +++ b/test/unit/native/native_timer.cpp @@ -14,6 +14,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -23,6 +25,12 @@ namespace boost::corosio { template struct native_timer_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait()), + decltype(std::declval().wait())>, + "native_timer::wait must shadow io_timer::wait"); + void testTimerConstruct() { io_context ctx(Backend); diff --git a/test/unit/native/native_udp_socket.cpp b/test/unit/native/native_udp_socket.cpp index 790236383..410d6df6d 100644 --- a/test/unit/native/native_udp_socket.cpp +++ b/test/unit/native/native_udp_socket.cpp @@ -18,6 +18,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -27,6 +29,52 @@ namespace boost::corosio { template struct native_udp_socket_test { + static_assert( + !std::is_same_v< + decltype(std::declval&>().send_to( + std::declval(), + std::declval())), + decltype(std::declval().send_to( + std::declval(), + std::declval()))>, + "native_udp_socket::send_to must shadow udp_socket::send_to"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().recv_from( + std::declval(), + std::declval())), + decltype(std::declval().recv_from( + std::declval(), + std::declval()))>, + "native_udp_socket::recv_from must shadow udp_socket::recv_from"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().connect( + std::declval())), + decltype(std::declval().connect( + std::declval()))>, + "native_udp_socket::connect must shadow udp_socket::connect"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().send( + std::declval())), + decltype(std::declval().send( + std::declval()))>, + "native_udp_socket::send must shadow udp_socket::send"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().recv( + std::declval())), + decltype(std::declval().recv( + std::declval()))>, + "native_udp_socket::recv must shadow udp_socket::recv"); + static_assert( + !std::is_same_v< + decltype(std::declval&>().wait( + wait_type::read)), + decltype(std::declval().wait(wait_type::read))>, + "native_udp_socket::wait must shadow udp_socket::wait"); + void testConstruct() { io_context ctx(Backend); @@ -179,6 +227,85 @@ struct native_udp_socket_test ioc.run(); } + void testSendRecvConnected() + { + io_context ioc(Backend); + + native_udp_socket a(ioc); + native_udp_socket b(ioc); + + b.open(); + auto ec = b.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST_EQ(ec, std::error_code{}); + auto b_ep = b.local_endpoint(); + + auto task = [](native_udp_socket& a, + native_udp_socket& b, + endpoint dest) -> capy::task<> { + auto [ec1] = co_await a.connect(dest); + BOOST_TEST_EQ(ec1, std::error_code{}); + BOOST_TEST(a.is_open()); + + char const msg[] = "native connected"; + auto [ec2, n2] = + co_await a.send(capy::const_buffer(msg, sizeof(msg))); + BOOST_TEST_EQ(ec2, std::error_code{}); + BOOST_TEST_EQ(n2, sizeof(msg)); + + char buf[64] = {}; + endpoint source; + auto [ec3, n3] = co_await b.recv_from( + capy::mutable_buffer(buf, sizeof(buf)), source); + BOOST_TEST_EQ(ec3, std::error_code{}); + BOOST_TEST_EQ(n3, sizeof(msg)); + BOOST_TEST_EQ(std::strcmp(buf, "native connected"), 0); + + auto [ec4] = co_await b.connect(source); + BOOST_TEST_EQ(ec4, std::error_code{}); + + char const reply[] = "native reply"; + auto [ec5, n5] = + co_await b.send(capy::const_buffer(reply, sizeof(reply))); + BOOST_TEST_EQ(ec5, std::error_code{}); + + char buf2[64] = {}; + auto [ec6, n6] = + co_await a.recv(capy::mutable_buffer(buf2, sizeof(buf2))); + BOOST_TEST_EQ(ec6, std::error_code{}); + BOOST_TEST_EQ(n6, sizeof(reply)); + BOOST_TEST_EQ(std::strcmp(buf2, "native reply"), 0); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(a, b, b_ep)); + ioc.run(); + } + + void testConnectAutoOpen() + { + io_context ioc(Backend); + + native_udp_socket receiver(ioc); + receiver.open(); + auto ec = receiver.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST_EQ(ec, std::error_code{}); + auto recv_ep = receiver.local_endpoint(); + + native_udp_socket sender(ioc); + BOOST_TEST_EQ(sender.is_open(), false); + + auto task = [](native_udp_socket& s, + endpoint dest) -> capy::task<> { + auto [ec] = co_await s.connect(dest); + BOOST_TEST_EQ(ec, std::error_code{}); + BOOST_TEST(s.is_open()); + }; + + auto ex = ioc.get_executor(); + capy::run_async(ex)(task(sender, recv_ep)); + ioc.run(); + } + void testVirtualDispatchFallback() { // Verify that calling through udp_socket& uses virtual dispatch @@ -217,15 +344,59 @@ struct native_udp_socket_test ioc.run(); } + // Exercise the shadowed wait() awaitable: wait_type::read on a + // bound UDP socket resolves when a datagram arrives from a peer. + void testWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + native_udp_socket recv(ioc); + recv.open(udp::v4()); + auto bec = recv.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + auto port = recv.local_endpoint().port(); + + native_udp_socket send(ioc); + send.open(udp::v4()); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await recv.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto sender = [&]() -> capy::task<> { + char dg[1] = {'X'}; + auto [ec, n] = co_await send.send_to( + capy::const_buffer(dg, sizeof(dg)), + endpoint(ipv4_address::loopback(), port)); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(sender()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + void run() { testConstruct(); testMoveConstruct(); testPolymorphicSlice(); testSendRecvLoopback(); + testSendRecvConnected(); + testConnectAutoOpen(); testCancelRecv(); testCloseWhileRecving(); testVirtualDispatchFallback(); + testWait(); } }; diff --git a/test/unit/timer.cpp b/test/unit/timer.cpp index fd805b00f..3ae1797fb 100644 --- a/test/unit/timer.cpp +++ b/test/unit/timer.cpp @@ -743,13 +743,11 @@ struct timer_test { io_context ioc(Backend); timer t(ioc); - timer delay(ioc); bool w1 = false, w2 = false; std::error_code ec1, ec2; t.expires_after(std::chrono::seconds(60)); - delay.expires_after(std::chrono::milliseconds(10)); auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> { @@ -758,18 +756,19 @@ struct timer_test done = true; }; - std::size_t expires_count = 0; - auto reset_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> { - (void)co_await delay_ref.wait(); - expires_count = t_ref.expires_at( - timer::clock_type::now() + std::chrono::seconds(30)); - }; - capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); - capy::run_async(ioc.get_executor())(reset_task(delay, t)); - ioc.run_for(std::chrono::milliseconds(100)); + // Drain so both waiters suspend on t.wait() and register. + while (ioc.poll() > 0) + ; + + auto expires_count = t.expires_at( + timer::clock_type::now() + std::chrono::seconds(30)); + + // Drain so the canceled completions reach the coroutines. + while (ioc.poll() > 0) + ; BOOST_TEST_EQ(expires_count, 2u); BOOST_TEST(w1); @@ -782,13 +781,11 @@ struct timer_test { io_context ioc(Backend); timer t(ioc); - timer delay(ioc); bool w1 = false, w2 = false; std::error_code ec1, ec2; t.expires_after(std::chrono::seconds(60)); - delay.expires_after(std::chrono::milliseconds(10)); auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> { @@ -797,17 +794,18 @@ struct timer_test done = true; }; - std::size_t expires_count = 0; - auto reset_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> { - (void)co_await delay_ref.wait(); - expires_count = t_ref.expires_after(std::chrono::seconds(30)); - }; - capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); - capy::run_async(ioc.get_executor())(reset_task(delay, t)); - ioc.run_for(std::chrono::milliseconds(100)); + // Drain so both waiters suspend on t.wait() and register. + while (ioc.poll() > 0) + ; + + auto expires_count = t.expires_after(std::chrono::seconds(30)); + + // Drain so the canceled completions reach the coroutines. + while (ioc.poll() > 0) + ; BOOST_TEST_EQ(expires_count, 2u); BOOST_TEST(w1);