Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/benchmark/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdexec/execution.hpp>

#include <exec/detail/numa.hpp>
#include <exec/static_thread_pool.hpp>

Expand Down
9 changes: 8 additions & 1 deletion include/exec/sequence/iterate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@

#if !STDEXEC_NO_STDCPP_RANGES()

# include "../../stdexec/__detail/__concepts.hpp"
# include "../../stdexec/__detail/__connect.hpp"
# include "../../stdexec/__detail/__env.hpp"
# include "../../stdexec/__detail/__execution_fwd.hpp"
# include "../../stdexec/__detail/__operation_states.hpp"
# include "../../stdexec/__detail/__optional.hpp"
# include "../../stdexec/execution.hpp"
# include "../../stdexec/__detail/__receivers.hpp"
# include "../../stdexec/__detail/__schedulers.hpp"
# include "../../stdexec/__detail/__sender_concepts.hpp"

# include "../detail/basic_sequence.hpp"
# include "../sender_for.hpp"
Expand Down
24 changes: 18 additions & 6 deletions include/exec/sequence_senders.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,25 @@
*/
#pragma once

#include "../stdexec/execution.hpp"
#include "../stdexec/__detail/__execution_fwd.hpp"

#include "../stdexec/__detail/__completion_signatures.hpp"
#include "../stdexec/__detail/__concepts.hpp"
#include "../stdexec/__detail/__connect.hpp"
#include "../stdexec/__detail/__debug.hpp"
#include "../stdexec/__detail/__diagnostics.hpp"
#include "../stdexec/__detail/__env.hpp"
#include "../stdexec/__detail/__just.hpp"
#include "../stdexec/__detail/__meta.hpp"
#include "../stdexec/__detail/__receivers.hpp"
#include "../stdexec/__detail/__senders.hpp"
#include "../stdexec/__detail/__stop_token.hpp"
#include "../stdexec/__detail/__tag_invoke.hpp"
#include "../stdexec/__detail/__transform_sender.hpp"
#include "../stdexec/__detail/__type_traits.hpp"
#include "../stdexec/__detail/__utility.hpp"
#include "../stdexec/stop_token.hpp"

#include "completion_signatures.hpp"

STDEXEC_PRAGMA_PUSH()
Expand Down Expand Up @@ -565,7 +579,7 @@ namespace experimental::execution
struct __sequence_type_check_failure //
: STDEXEC::__compile_time_error<__sequence_type_check_failure<_Data, _What...>>
{
static_assert(std::is_nothrow_move_constructible_v<_Data>,
static_assert(STDEXEC::__nothrow_move_constructible<_Data>,
"The data member of sender_type_check_failure must be nothrow move "
"constructible.");

Expand Down Expand Up @@ -783,11 +797,9 @@ namespace experimental::execution

static_assert(sequence_sender<_Sequence>
|| has_sequence_item_types<_Sequence, env_of_t<_Receiver>>,
"The first argument to " STDEXEC_PP_STRINGIZE(STDEXEC) "::subscribe must be "
"a sequence sender");
"The first argument to exec::subscribe must be a sequence sender");
static_assert(receiver<_Receiver>,
"The second argument to " STDEXEC_PP_STRINGIZE(STDEXEC) "::subscribe must be "
"a receiver");
"The second argument to exec::subscribe must be a receiver");
#if STDEXEC_ENABLE_EXTRA_TYPE_CHECKING()
static_assert(__type_check_arguments<__tfx_seq_t, _Receiver>());
#endif
Expand Down
65 changes: 34 additions & 31 deletions include/exec/static_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,24 @@
#pragma once

#include "../stdexec/__detail/__atomic.hpp"
#include "../stdexec/__detail/__bulk.hpp"
#include "../stdexec/__detail/__completion_signatures.hpp"
#include "../stdexec/__detail/__concepts.hpp"
#include "../stdexec/__detail/__config.hpp"
#include "../stdexec/__detail/__domain.hpp"
#include "../stdexec/__detail/__execution_fwd.hpp"
#include "../stdexec/__detail/__execution_legacy.hpp"
#include "../stdexec/__detail/__get_completion_signatures.hpp"
#include "../stdexec/__detail/__intrusive_queue.hpp"
#include "../stdexec/__detail/__manual_lifetime.hpp" // IWYU pragma: keep
#include "../stdexec/__detail/__meta.hpp" // IWYU pragma: keep
#include "../stdexec/execution.hpp"
#include "../stdexec/__detail/__manual_lifetime.hpp"
#include "../stdexec/__detail/__meta.hpp"
#include "../stdexec/__detail/__optional.hpp"
#include "../stdexec/__detail/__receivers.hpp"
#include "../stdexec/__detail/__transform_completion_signatures.hpp"
#include "../stdexec/__detail/__tuple.hpp"
#include "../stdexec/__detail/__type_traits.hpp"
#include "../stdexec/__detail/__variant.hpp"

#include "detail/atomic_intrusive_queue.hpp"
#include "detail/bwos_lifo_queue.hpp"
#include "detail/numa.hpp"
Expand All @@ -37,7 +50,9 @@
#include <condition_variable>
#include <cstdint>
#include <exception>
#include <limits>
#include <mutex>
#include <random>
#include <span>
#include <thread>
#include <type_traits>
Expand Down Expand Up @@ -690,12 +705,12 @@ namespace experimental::execution

alignas(64) __std::atomic<std::uint32_t> num_active_{};
alignas(64) remote_queue_list remotes_;
std::uint32_t thread_count_;
std::uint32_t max_steals_{thread_count_ + 1};
bwos_params params_;
std::vector<std::thread> threads_;
std::vector<std::optional<thread_state>> thread_states_;
numa_policy numa_;
std::uint32_t thread_count_;
std::uint32_t max_steals_{thread_count_ + 1};
bwos_params params_;
std::vector<std::thread> threads_;
std::vector<__optional<thread_state>> thread_states_;
numa_policy numa_;

struct thread_index_by_numa_node
{
Expand Down Expand Up @@ -1416,12 +1431,10 @@ namespace experimental::execution
}
};

using variant_t = __value_types_of_t<CvSender,
env_of_t<Receiver>,
__q<__decayed_std_tuple>,
__q<__nullable_std_variant>>;
using variant_t =
__value_types_of_t<CvSender, env_of_t<Receiver>, __q<__decayed_tuple>, __q<__variant>>;

variant_t data_;
variant_t data_{STDEXEC::__no_init};
_static_thread_pool& pool_;
Receiver rcvr_;
Shape shape_;
Expand All @@ -1440,7 +1453,7 @@ namespace experimental::execution
if constexpr (Parallelize)
{
return static_cast<std::uint32_t>(
(std::min) (shape_, static_cast<Shape>(pool_.available_parallelism())));
__umin({std::size_t(shape_), std::size_t(pool_.available_parallelism())}));
}
else
{
Expand All @@ -1451,19 +1464,8 @@ namespace experimental::execution
template <class F>
void apply(F f)
{
std::visit(
[&]<class Tuple>(Tuple& tupl) -> void
{
if constexpr (__std::same_as<Tuple, std::monostate>)
{
STDEXEC_TERMINATE();
}
else
{
std::apply([&](auto&... args) -> void { f(args...); }, tupl);
}
},
data_);
STDEXEC_ASSERT(!data_.__is_valueless());
__visit([&](auto& tupl) -> void { __apply(std::move(f), tupl); }, data_);
}

//! Construct from a pool, receiver, shape, and function.
Expand Down Expand Up @@ -1501,7 +1503,7 @@ namespace experimental::execution
template <class... As>
void set_value(As&&... as) noexcept
{
using tuple_t = __decayed_std_tuple<As...>;
using tuple_t = __decayed_tuple<As...>;

shared_state& state = shared_state_;

Expand All @@ -1514,6 +1516,7 @@ namespace experimental::execution
if constexpr (MayThrow)
{
STDEXEC::set_error(std::move(state.rcvr_), std::current_exception());
return;
}
}

Expand All @@ -1523,7 +1526,7 @@ namespace experimental::execution
}
else
{
state.apply([&](auto&... args)
state.apply([&](auto&... args) noexcept -> void
{ STDEXEC::set_value(std::move(state.rcvr_), std::move(args)...); });
}
}
Expand Down Expand Up @@ -1761,7 +1764,7 @@ namespace experimental::execution
std::size_t nthreads = this->pool_.available_parallelism();
bwos_params params = this->pool_.params();
std::size_t local_size = params.blockSize * params.numBlocks;
std::size_t chunk_size = (std::min) (size / nthreads, local_size * nthreads);
std::size_t chunk_size = __umin({size / nthreads, local_size * nthreads});
auto& remote_queue = *this->pool_.get_remote_queue();
auto it = std::ranges::begin(this->range_);
std::size_t i0 = 0;
Expand Down
19 changes: 19 additions & 0 deletions include/exec/thread_pool_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,28 @@
*/
#pragma once

#include "../stdexec/__detail/__execution_fwd.hpp"

#include "../stdexec/__detail/__connect.hpp"
#include "../stdexec/__detail/__env.hpp"
#include "../stdexec/__detail/__meta.hpp"
#include "../stdexec/__detail/__operation_states.hpp"
#include "../stdexec/__detail/__receivers.hpp"
#include "../stdexec/__detail/__schedulers.hpp"
#include "../stdexec/__detail/__transform_completion_signatures.hpp"
#include "../stdexec/__detail/__type_traits.hpp"

#include "sender_for.hpp"
#include "static_thread_pool.hpp"

#include <atomic>
#include <concepts>
#include <cstdint>
#include <exception>
#include <tuple>
#include <utility>
#include <variant>

namespace experimental::execution
{
struct CANNOT_DISPATCH_BULK_ALGORITHM_TO_THE_POOL_SCHEDULER;
Expand Down
9 changes: 8 additions & 1 deletion include/exec/trampoline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@
*/
#pragma once

#include "../stdexec/execution.hpp"
#include "../stdexec/__detail/__execution_fwd.hpp"

#include "../stdexec/__detail/__concepts.hpp"
#include "../stdexec/__detail/__domain.hpp"
#include "../stdexec/__detail/__env.hpp"
#include "../stdexec/__detail/__receivers.hpp"
#include "../stdexec/stop_token.hpp"

#include "completion_behavior.hpp"

#include <cstddef>
Expand Down
3 changes: 2 additions & 1 deletion include/stdexec/__detail/__execution_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
#pragma once

#include "__concepts.hpp"
#include "__config.hpp" // IWYU pragma: export

#include "__concepts.hpp"
#include "__meta.hpp"
#include "__type_traits.hpp"
#include "__utility.hpp"
Expand Down
Loading