diff --git a/include/beman/execution/detail/bulk.hpp b/include/beman/execution/detail/bulk.hpp index f12e292f..12b3f4ec 100644 --- a/include/beman/execution/detail/bulk.hpp +++ b/include/beman/execution/detail/bulk.hpp @@ -8,165 +8,276 @@ #ifdef BEMAN_HAS_IMPORT_STD import std; #else -#include #include #include +#include #include #include #endif #ifdef BEMAN_HAS_MODULES import beman.execution.detail.basic_sender; import beman.execution.detail.completion_signatures; -import beman.execution.detail.completion_signatures_for; +import beman.execution.detail.completion_signatures_of_t; import beman.execution.detail.default_impls; -import beman.execution.detail.get_completion_signatures; +import beman.execution.detail.execution_policy; +import beman.execution.detail.forward_like; import beman.execution.detail.get_domain_early; -import beman.execution.detail.impls_for; import beman.execution.detail.make_sender; import beman.execution.detail.meta.combine; import beman.execution.detail.meta.unique; -import beman.execution.detail.movable_value; import beman.execution.detail.product_type; import beman.execution.detail.sender; import beman.execution.detail.sender_adaptor_closure; +import beman.execution.detail.sender_for; import beman.execution.detail.set_error; import beman.execution.detail.set_value; import beman.execution.detail.transform_sender; #else #include #include -#include +#include #include -#include +#include +#include #include -#include #include #include #include -#include #include #include -#include #include +#include #include #include #include #endif -namespace beman::execution::detail { +// ---------------------------------------------------------------------------- -template -struct fixed_completions_helper; +namespace beman::execution::detail { +template +struct bulk_traits; template -struct fixed_completions_helper> { - - template - struct may_throw; - template - struct may_throw { - static constexpr bool value = - std::same_as && !::std::is_nothrow_invocable(); - }; - template - struct may_throw> { - static constexpr bool value = (false || ... || may_throw::value); - }; +struct bulk_traits { // for bulk_chunked + static constexpr bool is_invocable = ::std::is_invocable_v; - using type = std::conditional_t::value, - completion_signatures, - completion_signatures>; + static constexpr bool is_nothrow_invocable = ::std::is_nothrow_invocable_v; + + static auto invoke(F& fn, Shape shape, Args&... args) noexcept(is_nothrow_invocable) -> void { + if (shape > static_cast(0)) [[likely]] { + std::invoke(fn, 0, shape, args...); + } + } }; -struct bulk_t : ::beman::execution::sender_adaptor_closure { +template +struct bulk_traits { // for bulk_unchunked + static constexpr bool is_invocable = ::std::invocable; - template - requires(std::is_integral_v && ::beman::execution::detail::movable_value) - auto operator()(Shape&& shape, f&& fun) const { - return ::beman::execution::detail::make_sender_adaptor( - *this, std::forward(shape), std::forward(fun)); + static constexpr bool is_nothrow_invocable = ::std::is_nothrow_invocable_v; + + static auto invoke(F& fn, Shape shape, Args&... args) noexcept(is_nothrow_invocable) -> void { + for (auto i = static_cast(0); i < shape; ++i) { + std::invoke(fn, i, args...); + } } +}; + +template +struct bulk_transform_signatures; + +template +struct bulk_transform_signatures> { + template + struct is_nothrow : ::std::true_type {}; - template - requires(::beman::execution::sender && std::is_integral_v && - ::beman::execution::detail::movable_value) - auto operator()(Sender&& sndr, Shape&& shape, f&& fun) const { + template + struct is_nothrow<::beman::execution::set_value_t(Args...)> + : ::std::bool_constant< + ::beman::execution::detail::bulk_traits::is_nothrow_invocable> {}; - auto domain{::beman::execution::detail::get_domain_early(sndr)}; + using type = ::beman::execution::detail::meta::unique<::beman::execution::detail::meta::combine< + ::beman::execution::completion_signatures, + ::std::conditional_t< + (... && is_nothrow::value), + ::beman::execution::completion_signatures<>, + ::beman::execution::completion_signatures<::beman::execution::set_error_t(::std::exception_ptr)>>>>; +}; + +template +struct bulk_algo_t : ::beman::execution::sender_adaptor_closure> { + template + requires(::beman::execution::is_execution_policy_v<::std::remove_cvref_t> && ::std::integral && + ::std::copy_constructible<::std::decay_t>) + auto operator()(Policy&& policy, Shape shape, F&& f) const { + return ::beman::execution::detail::make_sender_adaptor( + *this, ::std::forward(policy), shape, ::std::forward(f)); + } + template + requires(::beman::execution::sender && + ::beman::execution::is_execution_policy_v<::std::remove_cvref_t> && ::std::integral && + ::std::copy_constructible<::std::decay_t>) + auto operator()(Sender&& sndr, Policy&& policy, Shape shape, F&& f) const { return ::beman::execution::transform_sender( - domain, + ::beman::execution::detail::get_domain_early(sndr), ::beman::execution::detail::make_sender( - *this, ::beman::execution::detail::product_type{shape, fun}, std::forward(sndr))); + *this, + ::beman::execution::detail::product_type<::std::remove_cvref_t, Shape, ::std::decay_t>{ + ::std::forward(policy), shape, ::std::forward(f)}, + ::std::forward(sndr))); } private: - template - using fixed_completions = typename fixed_completions_helper::type; - template + static constexpr bool is_chunked = IsChunked; + + template struct get_signatures; - template - struct get_signatures<::beman::execution::detail::basic_sender<::beman::execution::detail::bulk_t, - ::beman::execution::detail::product_type, - Sender>, - Env> { - using completions = decltype(::beman::execution::get_completion_signatures()); - using type = ::beman::execution::detail::meta::unique< - ::beman::execution::detail::meta::combine>>; + template + struct get_signatures< + ::beman::execution::detail:: + basic_sender, Sender>, + Env...> { + using type = typename ::beman::execution::detail::bulk_transform_signatures< + is_chunked, + F, + Shape, + ::beman::execution::completion_signatures_of_t>::type; }; public: template - static consteval auto get_completion_signatures() { - return typename get_signatures, Env...>::type{}; + static consteval auto get_completion_signatures() noexcept { + return typename get_signatures<::std::remove_cvref_t, Env...>::type{}; } - struct impls_for : ::beman::execution::detail::default_impls { + struct impls_for : ::beman::execution::detail::default_impls { struct complete_impl { - template - requires(!::std::same_as || std::is_invocable_v) + template + requires(!::std::same_as || + ::beman::execution::detail::bulk_traits::is_invocable) auto operator()(Index, - ::beman::execution::detail::product_type& state, - Rcvr& rcvr, + ::beman::execution::detail::product_type& state, + Receiver& rcvr, Tag, Args&&... args) const noexcept -> void { - if constexpr (std::same_as) { - auto& [shape, f] = state; - - using s_type = std::remove_cvref_t; - - constexpr bool nothrow = noexcept(f(s_type(shape), args...)); - + if constexpr (::std::same_as) { + auto& [policy, shape, f] = state; + constexpr bool nothrow = + ::beman::execution::detail::bulk_traits::is_nothrow_invocable; try { [&]() noexcept(nothrow) { - for (decltype(s_type(shape)) i = 0; i < shape; i++) { - f(s_type(i), args...); - } - Tag()(std::move(rcvr), std::forward(args)...); + ::beman::execution::detail::bulk_traits::invoke( + f, shape, args...); + Tag()(::std::move(rcvr), ::std::forward(args)...); }(); - } catch (...) { - if constexpr (not nothrow) { - ::beman::execution::set_error(std::move(rcvr), std::current_exception()); + if constexpr (!nothrow) { + ::beman::execution::set_error(::std::move(rcvr), ::std::current_exception()); } } } else { - Tag()(std::move(rcvr), std::forward(args)...); + Tag()(::std::move(rcvr), ::std::forward(args)...); } } }; - static constexpr auto complete{complete_impl{}}; + static constexpr complete_impl complete{}; + }; +}; + +using bulk_chunked_t = ::beman::execution::detail::bulk_algo_t; + +using bulk_unchunked_t = ::beman::execution::detail::bulk_algo_t; + +struct bulk_t : ::beman::execution::sender_adaptor_closure { + template + requires(::beman::execution::is_execution_policy_v<::std::remove_cvref_t> && ::std::integral && + ::std::copy_constructible<::std::decay_t>) + auto operator()(Policy&& policy, Shape shape, F&& f) const { + return ::beman::execution::detail::make_sender_adaptor( + *this, ::std::forward(policy), shape, ::std::forward(f)); + } + + template + requires(::beman::execution::sender && + ::beman::execution::is_execution_policy_v<::std::remove_cvref_t> && ::std::integral && + ::std::copy_constructible<::std::decay_t>) + auto operator()(Sender&& sndr, Policy&& policy, Shape shape, F&& f) const { + return ::beman::execution::transform_sender( + ::beman::execution::detail::get_domain_early(sndr), + ::beman::execution::detail::make_sender( + *this, + ::beman::execution::detail::product_type<::std::remove_cvref_t, Shape, ::std::decay_t>{ + ::std::forward(policy), shape, ::std::forward(f)}, + ::std::forward(sndr))); + } + + template <::beman::execution::detail::sender_for Sender, typename... Env> + auto transform_sender(Sender&& sndr, Env&&...) const { + auto data = ::beman::execution::detail::forward_like(sndr.template get<1>()); + auto child = ::beman::execution::detail::forward_like(sndr.template get<2>()); + + auto& policy = data.template get<0>(); + auto& shape = data.template get<1>(); + auto& f = data.template get<2>(); + + return bulk_chunked_t{}(::std::move(child), + policy, + shape, + this->wrap_chunked<::std::remove_cvref_t>(::std::move(f))); + } + + private: + template + struct get_signatures; + template + struct get_signatures<::beman::execution::detail:: + basic_sender, Sender>, + Env...> { + using type = typename ::beman::execution::detail::bulk_transform_signatures< + false, + F, + Shape, + ::beman::execution::completion_signatures_of_t>::type; }; + + template + static auto wrap_chunked(Fn f) noexcept { + return [f = std::move(f)](Shape begin, Shape end, Args&&... args) noexcept( + ::std::is_nothrow_invocable_v) { + while (begin != end) { + std::invoke(f, begin++, args...); + } + }; + } + + public: + template + static consteval auto get_completion_signatures() noexcept { + return typename get_signatures<::std::remove_cvref_t, Env...>::type{}; + } }; } // namespace beman::execution::detail -#include +// ---------------------------------------------------------------------------- namespace beman::execution { -using bulk_t = ::beman::execution::detail::bulk_t; -inline constexpr ::beman::execution::bulk_t bulk{}; +using bulk_t = ::beman::execution::detail::bulk_t; +using bulk_chunked_t = ::beman::execution::detail::bulk_chunked_t; +using bulk_unchunked_t = ::beman::execution::detail::bulk_unchunked_t; + +inline constexpr ::beman::execution::bulk_t bulk{}; +inline constexpr ::beman::execution::bulk_chunked_t bulk_chunked{}; +inline constexpr ::beman::execution::bulk_unchunked_t bulk_unchunked{}; } // namespace beman::execution diff --git a/include/beman/execution/detail/execution_policy.hpp b/include/beman/execution/detail/execution_policy.hpp new file mode 100644 index 00000000..aa0e6d81 --- /dev/null +++ b/include/beman/execution/detail/execution_policy.hpp @@ -0,0 +1,80 @@ +// include/beman/execution/detail/execution_policy.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION_DETAIL_EXECUTION_POLICY +#define INCLUDED_BEMAN_EXECUTION_DETAIL_EXECUTION_POLICY +#include +#include +#ifdef BEMAN_HAS_IMPORT_STD +import std; +#else +#ifdef __cpp_lib_execution +#include +#else +#include +#endif +#endif + +// ---------------------------------------------------------------------------- + +namespace beman::execution { +#ifdef __cpp_lib_execution + +using ::std::execution::par; +using ::std::execution::parallel_policy; + +using ::std::execution::par_unseq; +using ::std::execution::parallel_unsequenced_policy; + +using ::std::execution::seq; +using ::std::execution::sequenced_policy; + +#if __cpp_lib_execution >= 201902L +using ::std::execution::unseq; +using ::std::execution::unsequenced_policy; +#endif + +template +struct is_execution_policy : ::std::is_execution_policy {}; + +template +inline constexpr bool is_execution_policy_v = ::beman::execution::is_execution_policy::value; + +#else + +struct sequenced_policy {}; +inline constexpr sequenced_policy seq{}; + +struct parallel_policy {}; +inline constexpr parallel_policy par{}; + +struct parallel_unsequenced_policy {}; +inline constexpr parallel_unsequenced_policy par_unseq{}; + +struct unsequenced_policy {}; +inline constexpr unsequenced_policy unseq{}; + +template +struct is_execution_policy : ::std::false_type {}; + +template <> +struct is_execution_policy< ::beman::execution::sequenced_policy> : ::std::true_type {}; + +template <> +struct is_execution_policy< ::beman::execution::parallel_policy> : ::std::true_type {}; + +template <> +struct is_execution_policy< ::beman::execution::parallel_unsequenced_policy> : ::std::true_type {}; + +template <> +struct is_execution_policy< ::beman::execution::unsequenced_policy> : ::std::true_type {}; + +template +inline constexpr bool is_execution_policy_v = ::beman::execution::is_execution_policy::value; + +#endif +} // namespace beman::execution + +// ---------------------------------------------------------------------------- + +#endif // INCLUDED_BEMAN_EXECUTION_DETAIL_EXECUTION_POLICY diff --git a/include/beman/execution/execution.hpp b/include/beman/execution/execution.hpp index 46ddb321..6d632826 100644 --- a/include/beman/execution/execution.hpp +++ b/include/beman/execution/execution.hpp @@ -12,12 +12,14 @@ import beman.execution.detail.affine_on; import beman.execution.detail.as_except_ptr; import beman.execution.detail.associate; import beman.execution.detail.bulk; +import beman.execution.detail.execution_policy; import beman.execution.detail.completion_signature; import beman.execution.detail.completion_signatures; import beman.execution.detail.connect; import beman.execution.detail.continues_on; import beman.execution.detail.counting_scope; import beman.execution.detail.env; +import beman.execution.detail.execution_policy; import beman.execution.detail.forwarding_query; import beman.execution.detail.get_allocator; import beman.execution.detail.get_await_completion_adaptor; @@ -29,6 +31,7 @@ import beman.execution.detail.get_forward_progress_guarantee; import beman.execution.detail.get_env; import beman.execution.detail.get_scheduler; import beman.execution.detail.get_stop_token; +import beman.execution.detail.inline_scheduler; import beman.execution.detail.into_variant; import beman.execution.detail.just; import beman.execution.detail.let; @@ -44,6 +47,7 @@ import beman.execution.detail.scheduler; import beman.execution.detail.scope_token; import beman.execution.detail.sender_adaptor_closure; import beman.execution.detail.sender_in; +import beman.execution.detail.sender_to; import beman.execution.detail.sender; import beman.execution.detail.set_error; import beman.execution.detail.set_stopped; @@ -76,6 +80,7 @@ import beman.execution.detail.write_env; #include #include #include +#include #include #include #include diff --git a/src/beman/execution/CMakeLists.txt b/src/beman/execution/CMakeLists.txt index 73d84297..9344e402 100644 --- a/src/beman/execution/CMakeLists.txt +++ b/src/beman/execution/CMakeLists.txt @@ -69,6 +69,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/default_domain.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/default_impls.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/dependent_sender_error.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/execution_policy.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/emplace_from.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/env.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/env_of_t.hpp @@ -259,6 +260,7 @@ if(BEMAN_USE_MODULES) default_domain.cppm default_impls.cppm dependent_sender_error.cppm + execution_policy.cppm emplace_from.cppm enable_sender.cppm env_of_t.cppm diff --git a/src/beman/execution/bulk.cppm b/src/beman/execution/bulk.cppm index be9f7ac0..0cb360a6 100644 --- a/src/beman/execution/bulk.cppm +++ b/src/beman/execution/bulk.cppm @@ -9,4 +9,8 @@ export module beman.execution.detail.bulk; namespace beman::execution { export using beman::execution::bulk_t; export using beman::execution::bulk; +export using beman::execution::bulk_chunked_t; +export using beman::execution::bulk_chunked; +export using beman::execution::bulk_unchunked_t; +export using beman::execution::bulk_unchunked; } // namespace beman::execution diff --git a/src/beman/execution/execution.cppm b/src/beman/execution/execution.cppm index 99b30a61..3bf9e4c2 100644 --- a/src/beman/execution/execution.cppm +++ b/src/beman/execution/execution.cppm @@ -23,6 +23,7 @@ import beman.execution.detail.default_domain; export import beman.execution.detail.env; export import beman.execution.detail.env_of_t; export import beman.execution.detail.error_types_of_t; // [exec.getcomplsigs], completion signatures +export import beman.execution.detail.execution_policy; import beman.execution.detail.forwarding_query; import beman.execution.detail.get_allocator; import beman.execution.detail.get_await_completion_adaptor; @@ -201,6 +202,8 @@ export using ::beman::execution::let_value_t; export using ::beman::execution::let_error_t; export using ::beman::execution::let_stopped_t; export using ::beman::execution::bulk_t; +export using ::beman::execution::bulk_chunked_t; +export using ::beman::execution::bulk_unchunked_t; //-dk:TODO export using ::beman::execution::split_t; export using ::beman::execution::when_all_t; export using ::beman::execution::when_all_with_variant_t; @@ -219,6 +222,8 @@ export using ::beman::execution::let_value; export using ::beman::execution::let_error; export using ::beman::execution::let_stopped; export using ::beman::execution::bulk; +export using ::beman::execution::bulk_chunked; +export using ::beman::execution::bulk_unchunked; //-dk:TODO export using ::beman::execution::split; export using ::beman::execution::when_all; export using ::beman::execution::when_all_with_variant; diff --git a/src/beman/execution/execution_policy.cppm b/src/beman/execution/execution_policy.cppm new file mode 100644 index 00000000..9c79d4a1 --- /dev/null +++ b/src/beman/execution/execution_policy.cppm @@ -0,0 +1,26 @@ +module; +// src/beman/execution/execution_policy.cppm -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include + +export module beman.execution.detail.execution_policy; + +namespace beman::execution { +export using beman::execution::parallel_policy; +export using beman::execution::par; + +export using beman::execution::parallel_unsequenced_policy; +export using beman::execution::par_unseq; + +export using beman::execution::sequenced_policy; +export using beman::execution::seq; + +#if !defined(__cpp_lib_execution) || (__cpp_lib_execution >= 201902L) +export using beman::execution::unsequenced_policy; +export using beman::execution::unseq; +#endif + +export using beman::execution::is_execution_policy; +export using beman::execution::is_execution_policy_v; +} // namespace beman::execution diff --git a/tests/beman/execution/exec-bulk.test.cpp b/tests/beman/execution/exec-bulk.test.cpp index 4a883271..685f83b0 100644 --- a/tests/beman/execution/exec-bulk.test.cpp +++ b/tests/beman/execution/exec-bulk.test.cpp @@ -2,43 +2,41 @@ // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception #include +#include #include #include #include #ifdef BEMAN_HAS_MODULES import beman.execution; #else -#include -#include -#include -#include -#include +#include #endif namespace { + auto test_bulk() { - auto b0 = test_std::bulk(test_std::just(), 1, [](int) {}); + auto b0 = test_std::bulk(test_std::just(), test_std::seq, 1, [](int) {}); static_assert(test_std::sender); auto b0_env = test_std::get_env(b0); auto b0_completions = test_std::get_completion_signatures(); static_assert( - std::is_same_v>, + std::is_same_v< + decltype(b0_completions), + test_std::completion_signatures>, "Completion signatures do not match!"); int counter = 0; - auto b1 = test_std::bulk(test_std::just(), 5, [&](int i) { counter += i; }); + auto b1 = test_std::bulk(test_std::just(), test_std::seq, 5, [&](int i) { counter += i; }); static_assert(test_std::sender); auto b1_env = test_std::get_env(b0); auto b1_completions = test_std::get_completion_signatures(); static_assert( - std::is_same_v>, + std::is_same_v< + decltype(b1_completions), + test_std::completion_signatures>, "Completion signatures do not match!"); test_std::sync_wait(b1); ASSERT(counter == 10); @@ -48,20 +46,19 @@ auto test_bulk() { std::vector results(a.size(), 0); - auto b2 = test_std::bulk(test_std::just(a), a.size(), [&](std::size_t index, const std::vector& vec) { - results[index] = vec[index] * b[index]; - }); + auto b2 = test_std::bulk( + test_std::just(a), test_std::seq, a.size(), [&](std::size_t index, const std::vector& vec) { + results[index] = vec[index] * b[index]; + }); static_assert(test_std::sender); auto b2_env = test_std::get_env(b2); auto b2_completions = test_std::get_completion_signatures(); - static_assert( - std::is_same_v), - beman::execution::set_error_t(std::exception_ptr)>>, - "Completion signatures do not match!"); + static_assert(std::is_same_v), + test_std::set_error_t(std::exception_ptr)>>, + "Completion signatures do not match!"); test_std::sync_wait(b2); - // Expected results: element-wise multiplication of a and b std::vector expected{9, 20, 33, 52, 70, 90, 112, 136}; for (::std::size_t i = 0; i < results.size(); ++i) { @@ -69,52 +66,50 @@ auto test_bulk() { } } -auto test_bulk_noexept() { - auto b0 = test_std::bulk(test_std::just(), 1, [](int) noexcept {}); +auto test_bulk_noexcept() { + auto b0 = test_std::bulk(test_std::just(), test_std::seq, 1, [](int) noexcept {}); auto b0_env = test_std::get_env(b0); auto b0_completions = test_std::get_completion_signatures(); - static_assert(std::is_same_v>, + static_assert(std::is_same_v>, "Completion signatures do not match!"); static_assert(test_std::sender); int counter = 0; - auto b1 = test_std::bulk(test_std::just(), 5, [&](int i) noexcept { counter += i; }); + auto b1 = test_std::bulk(test_std::just(), test_std::seq, 5, [&](int i) noexcept { counter += i; }); static_assert(test_std::sender); auto b1_env = test_std::get_env(b0); auto b1_completions = test_std::get_completion_signatures(); - static_assert(std::is_same_v>, + static_assert(std::is_same_v>, "Completion signatures do not match!"); test_std::sync_wait(b1); ASSERT(counter == 10); } auto test_bulk_pipeable() { - auto b0 = test_std::just() | test_std::bulk(1, [](int) {}); + auto b0 = test_std::just() | test_std::bulk(test_std::seq, 1, [](int) {}); static_assert(test_std::sender); auto b0_env = test_std::get_env(b0); auto b0_completions = test_std::get_completion_signatures(); static_assert( - std::is_same_v>, + std::is_same_v< + decltype(b0_completions), + test_std::completion_signatures>, "Completion signatures do not match!"); int counter = 0; - auto b1 = test_std::just() | test_std::bulk(5, [&](int i) { counter += i; }); + auto b1 = test_std::just() | test_std::bulk(test_std::seq, 5, [&](int i) { counter += i; }); static_assert(test_std::sender); auto b1_env = test_std::get_env(b0); auto b1_completions = test_std::get_completion_signatures(); static_assert( - std::is_same_v>, + std::is_same_v< + decltype(b1_completions), + test_std::completion_signatures>, "Completion signatures do not match!"); test_std::sync_wait(b1); ASSERT(counter == 10); @@ -124,21 +119,20 @@ auto test_bulk_pipeable() { std::vector results(a.size(), 0); - auto b2 = test_std::just(a) | test_std::bulk(a.size(), [&](std::size_t index, const std::vector& vec) { + auto b2 = test_std::just(a) | + test_std::bulk(test_std::seq, a.size(), [&](std::size_t index, const std::vector& vec) { results[index] = vec[index] * b[index]; }); static_assert(test_std::sender); auto b2_env = test_std::get_env(b2); auto b2_completions = test_std::get_completion_signatures(); - static_assert( - std::is_same_v), - beman::execution::set_error_t(std::exception_ptr)>>, - "Completion signatures do not match!"); + static_assert(std::is_same_v), + test_std::set_error_t(std::exception_ptr)>>, + "Completion signatures do not match!"); test_std::sync_wait(b2); - // Expected results: element-wise multiplication of a and b std::vector expected{9, 20, 33, 52, 70, 90, 112, 136}; for (::std::size_t i = 0; i < results.size(); ++i) { @@ -146,6 +140,236 @@ auto test_bulk_pipeable() { } } +auto test_bulk_chunked() { + int counter = 0; + + auto b0 = test_std::bulk_chunked(test_std::just(), test_std::seq, 5, [&](int begin, int end) { + for (int i = begin; i < end; ++i) { + counter += i; + } + }); + + static_assert(test_std::sender); + test_std::sync_wait(b0); + ASSERT(counter == 10); +} + +auto test_bulk_chunked_with_values() { + std::vector a{1, 2, 3, 4}; + std::vector results(a.size(), 0); + + auto b0 = test_std::bulk_chunked(test_std::just(a), + test_std::seq, + a.size(), + [&](std::size_t begin, std::size_t end, const std::vector& vec) { + for (std::size_t i = begin; i < end; ++i) { + results[i] = vec[i] * 2; + } + }); + + static_assert(test_std::sender); + test_std::sync_wait(b0); + + std::vector expected{2, 4, 6, 8}; + for (std::size_t i = 0; i < results.size(); ++i) { + ASSERT(results[i] == expected[i]); + } +} + +auto test_bulk_chunked_pipeable() { + int counter = 0; + + auto b0 = test_std::just() | test_std::bulk_chunked(test_std::seq, 5, [&](int begin, int end) { + for (int i = begin; i < end; ++i) { + counter += i; + } + }); + + static_assert(test_std::sender); + test_std::sync_wait(b0); + ASSERT(counter == 10); +} + +auto test_bulk_unchunked() { + int counter = 0; + + auto b0 = test_std::bulk_unchunked(test_std::just(), test_std::seq, 5, [&](int i) { counter += i; }); + + static_assert(test_std::sender); + test_std::sync_wait(b0); + ASSERT(counter == 10); +} + +auto test_bulk_unchunked_with_values() { + std::vector a{1, 2, 3, 4}; + std::vector results(a.size(), 0); + + auto b0 = test_std::bulk_unchunked( + test_std::just(a), test_std::seq, a.size(), [&](std::size_t index, const std::vector& vec) { + results[index] = vec[index] * 3; + }); + + static_assert(test_std::sender); + test_std::sync_wait(b0); + + std::vector expected{3, 6, 9, 12}; + for (std::size_t i = 0; i < results.size(); ++i) { + ASSERT(results[i] == expected[i]); + } +} + +auto test_bulk_unchunked_pipeable() { + int counter = 0; + + auto b0 = test_std::just() | test_std::bulk_unchunked(test_std::seq, 5, [&](int i) { counter += i; }); + + static_assert(test_std::sender); + test_std::sync_wait(b0); + ASSERT(counter == 10); +} + +auto test_execution_policies() { + int counter = 0; + test_std::sync_wait(test_std::bulk(test_std::just(), test_std::par, 5, [&](int i) { counter += i; })); + ASSERT(counter == 10); + + counter = 0; + test_std::sync_wait(test_std::bulk(test_std::just(), test_std::par_unseq, 5, [&](int i) { counter += i; })); + ASSERT(counter == 10); + + counter = 0; + test_std::sync_wait(test_std::bulk(test_std::just(), test_std::unseq, 5, [&](int i) { counter += i; })); + ASSERT(counter == 10); +} + +auto test_bulk_shape_zero() { + int counter = 0; + test_std::sync_wait(test_std::bulk(test_std::just(), test_std::seq, 0, [&](int) { counter++; })); + ASSERT(counter == 0); + + test_std::sync_wait(test_std::bulk_chunked(test_std::just(), test_std::seq, 0, [&](int, int) { counter++; })); + ASSERT(counter == 0); + + test_std::sync_wait(test_std::bulk_unchunked(test_std::just(), test_std::seq, 0, [&](int) { counter++; })); + ASSERT(counter == 0); +} + +auto test_bulk_exception_handling() { + bool error_caught = false; + + auto sndr = test_std::bulk(test_std::just(), test_std::seq, 5, [](int i) { + if (i == 3) + throw std::runtime_error("test error"); + }); + + try { + test_std::sync_wait(sndr); + } catch (...) { + error_caught = true; + } + ASSERT(error_caught); +} + +auto test_bulk_chunked_exception_handling() { + bool error_caught = false; + + auto sndr = test_std::bulk_chunked( + test_std::just(), test_std::seq, 5, [](int, int) { throw std::runtime_error("chunked error"); }); + + try { + test_std::sync_wait(sndr); + } catch (...) { + error_caught = true; + } + ASSERT(error_caught); +} + +auto test_bulk_unchunked_exception_handling() { + bool error_caught = false; + + auto sndr = test_std::bulk_unchunked(test_std::just(), test_std::seq, 5, [](int i) { + if (i == 2) + throw std::runtime_error("unchunked error"); + }); + + try { + test_std::sync_wait(sndr); + } catch (...) { + error_caught = true; + } + ASSERT(error_caught); +} + +auto test_bulk_chunked_noexcept() { + auto b0 = test_std::bulk_chunked(test_std::just(), test_std::seq, 1, [](int, int) noexcept {}); + auto b0_env = test_std::get_env(b0); + auto b0_completions = test_std::get_completion_signatures(); + static_assert(std::is_same_v>, + "Chunked noexcept completion signatures do not match!"); +} + +auto test_bulk_unchunked_noexcept() { + auto b0 = test_std::bulk_unchunked(test_std::just(), test_std::seq, 1, [](int) noexcept {}); + auto b0_env = test_std::get_env(b0); + auto b0_completions = test_std::get_completion_signatures(); + static_assert(std::is_same_v>, + "Unchunked noexcept completion signatures do not match!"); +} + +auto test_bulk_shape_one() { + int counter = 0; + test_std::sync_wait(test_std::bulk(test_std::just(), test_std::seq, 1, [&](int i) { + ASSERT(i == 0); + counter++; + })); + ASSERT(counter == 1); + + counter = 0; + test_std::sync_wait(test_std::bulk_chunked(test_std::just(), test_std::seq, 1, [&](int begin, int end) { + ASSERT(begin == 0); + ASSERT(end == 1); + counter++; + })); + ASSERT(counter == 1); + + counter = 0; + test_std::sync_wait(test_std::bulk_unchunked(test_std::just(), test_std::seq, 1, [&](int i) { + ASSERT(i == 0); + counter++; + })); + ASSERT(counter == 1); +} + +auto test_bulk_chunked_covers_full_range() { + std::size_t seen_begin = 999; + std::size_t seen_end = 999; + int call_count = 0; + + test_std::sync_wait(test_std::bulk_chunked( + test_std::just(), test_std::seq, std::size_t(10), [&](std::size_t begin, std::size_t end) { + seen_begin = begin; + seen_end = end; + call_count++; + })); + + ASSERT(call_count == 1); + ASSERT(seen_begin == 0); + ASSERT(seen_end == 10); +} + +auto test_bulk_multiple_values() { + int sum_a = 0; + int sum_b = 0; + + test_std::sync_wait(test_std::bulk(test_std::just(10, 20), test_std::seq, 3, [&](int i, int a, int b) { + sum_a += a; + sum_b += b + i; + })); + + ASSERT(sum_a == 30); + ASSERT(sum_b == 60 + 0 + 1 + 2); +} + } // namespace TEST(exec_bulk) { @@ -153,8 +377,24 @@ TEST(exec_bulk) { try { test_bulk(); - test_bulk_noexept(); + test_bulk_noexcept(); test_bulk_pipeable(); + test_bulk_chunked(); + test_bulk_chunked_with_values(); + test_bulk_chunked_pipeable(); + test_bulk_unchunked(); + test_bulk_unchunked_with_values(); + test_bulk_unchunked_pipeable(); + test_execution_policies(); + test_bulk_shape_zero(); + test_bulk_exception_handling(); + test_bulk_chunked_exception_handling(); + test_bulk_unchunked_exception_handling(); + test_bulk_chunked_noexcept(); + test_bulk_unchunked_noexcept(); + test_bulk_shape_one(); + test_bulk_chunked_covers_full_range(); + test_bulk_multiple_values(); } catch (...) {