diff --git a/CMakeLists.txt b/CMakeLists.txt index a47b66424..74d6ebc08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -165,7 +165,6 @@ PUBLIC FILES ${exec_headers} ${stdexec_headers} - include/execpools/thread_pool_base.hpp # stdexec_version_config.hpp is generated by rapids' script FILE_SET version_config TYPE HEADERS @@ -273,13 +272,6 @@ target_compile_options(stdexec_executable_flags INTERFACE -include stdexec/__detail/__force_include.hpp> ) -if (STDEXEC_IS_TOP_LEVEL) - # Integrate with LLVM/clang tooling - include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/clangd_compile_info.cmake) - add_executable(_clangd_helper_file include/._clangd_helper_file.cpp) - target_link_libraries(_clangd_helper_file PRIVATE STDEXEC::stdexec) -endif() - # Set up nvexec library option(STDEXEC_ENABLE_CUDA "Enable CUDA targets for non-nvc++ compilers" OFF) @@ -465,6 +457,19 @@ if(STDEXEC_BUILD_EXAMPLES) add_subdirectory(examples) endif() +if (STDEXEC_IS_TOP_LEVEL) + # Integrate with LLVM/clang tooling + include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/clangd_compile_info.cmake) + add_executable(_clangd_helper_file include/._clangd_helper_file.cpp) + target_link_libraries(_clangd_helper_file PRIVATE + STDEXEC::stdexec + $ + $ + $ + $ + ) +endif() + ############################################################################## # Install targets ------------------------------------------------------------ diff --git a/cmake/Modules/ConfigureASIO.cmake b/cmake/Modules/ConfigureASIO.cmake index c21442b8d..210e5833c 100644 --- a/cmake/Modules/ConfigureASIO.cmake +++ b/cmake/Modules/ConfigureASIO.cmake @@ -14,22 +14,13 @@ if(STDEXEC_ENABLE_ASIO) message(FATAL_ERROR "Unknown configuration for ASIO implementation: " ${STDEXEC_ASIO_IMPLEMENTATION}) endif() - set(ASIOEXEC_USES_BOOST ${STDEXEC_ASIO_USES_BOOST}) - set(ASIOEXEC_USES_STANDALONE ${STDEXEC_ASIO_USES_STANDALONE}) + set(STDEXEC_ASIO_CONFIG_HPP ${CMAKE_CURRENT_BINARY_DIR}/include/exec/asio/asio_config.hpp) - set(STDEXEC_ASIO_POOL_CONFIG_HPP ${CMAKE_CURRENT_BINARY_DIR}/include/execpools/asio/asio_config.hpp) - set(ASIOEXEC_CONFIG_HPP ${CMAKE_CURRENT_BINARY_DIR}/include/exec/asio/asio_config.hpp) - - configure_file( - include/execpools/asio/asio_config.hpp.in - ${STDEXEC_ASIO_POOL_CONFIG_HPP} - ) configure_file( include/exec/asio/asio_config.hpp.in - ${ASIOEXEC_CONFIG_HPP} + ${STDEXEC_ASIO_CONFIG_HPP} ) - file(GLOB_RECURSE boost_pool_sources CONFIGURE_DEPENDS include/execpools/asio/*.hpp) file(GLOB_RECURSE asioexec_sources CONFIGURE_DEPENDS include/exec/asio/*.hpp) if(${STDEXEC_ASIO_USES_BOOST}) @@ -41,98 +32,72 @@ if(STDEXEC_ENABLE_ASIO) OPTIONS "BOOST_SKIP_INSTALL_RULES OFF" ) - add_library(stdexec_boost_pool INTERFACE) - list(APPEND stdexec_export_targets stdexec_boost_pool) - add_library(STDEXEC::asio_pool ALIAS stdexec_boost_pool) + add_library(asioexec INTERFACE) + list(APPEND stdexec_export_targets asioexec) + add_library(STDEXEC::asioexec ALIAS asioexec) + + # These aliases are provided for backwards compatibility with the old target names + add_library(asioexec_boost ALIAS asioexec) + add_library(stdexec_boost_pool ALIAS asioexec) + add_library(STDEXEC::asio_pool ALIAS asioexec) + add_library(STDEXEC::asioexec_boost ALIAS asioexec) - target_sources(stdexec_boost_pool PUBLIC + target_sources(asioexec PUBLIC FILE_SET headers TYPE HEADERS BASE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/include - FILES ${boost_pool_sources} + FILES ${asioexec_sources} BASE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include - FILES ${STDEXEC_ASIO_POOL_CONFIG_HPP} + FILES ${STDEXEC_ASIO_CONFIG_HPP} ) - target_compile_definitions(stdexec_boost_pool INTERFACE STDEXEC_ASIO_USES_BOOST) + target_compile_definitions(asioexec INTERFACE STDEXEC_ASIO_USES_BOOST) - target_link_libraries(stdexec_boost_pool INTERFACE + target_link_libraries(asioexec INTERFACE STDEXEC::stdexec Boost::asio ) - install(TARGETS stdexec_boost_pool + install(TARGETS asioexec EXPORT stdexec-exports FILE_SET headers ) - add_library(asioexec_boost INTERFACE) - list(APPEND stdexec_export_targets asioexec_boost) - add_library(STDEXEC::asioexec_boost ALIAS asioexec_boost) - - target_sources(asioexec_boost PUBLIC - FILE_SET headers - TYPE HEADERS - BASE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/include - FILES ${asioexec_sources} - BASE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include - FILES ${ASIOEXEC_CONFIG_HPP} - ) - - target_compile_definitions(asioexec_boost INTERFACE STDEXEC_ASIO_USES_BOOST) - - target_link_libraries(asioexec_boost INTERFACE - STDEXEC::stdexec - Boost::asio - ) - install(TARGETS asioexec_boost EXPORT stdexec-exports FILE_SET headers) - elseif(${STDEXEC_ASIO_USES_STANDALONE}) include(cmake/import_standalone_asio.cmake) import_standalone_asio( TAG "asio-1-31-0" VERSION "1.31.0") - add_library(stdexec_asio_pool INTERFACE) - list(APPEND stdexec_export_targets stdexec_asio_pool) - add_library(STDEXEC::asio_pool ALIAS stdexec_asio_pool) + add_library(asioexec INTERFACE) + list(APPEND stdexec_export_targets asioexec) + add_library(STDEXEC::asioexec ALIAS asioexec) - target_sources(stdexec_asio_pool PUBLIC + # These aliases are provided for backwards compatibility with the old target names + add_library(asioexec_asio ALIAS asioexec) + add_library(stdexec_asio_pool ALIAS asioexec) + add_library(STDEXEC::asio_pool ALIAS asioexec) + add_library(STDEXEC::asioexec_asio ALIAS asioexec) + + target_sources(asioexec PUBLIC FILE_SET headers TYPE HEADERS BASE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/include - FILES ${boost_pool_sources} + FILES ${asioexec_sources} BASE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include - FILES ${STDEXEC_ASIO_POOL_CONFIG_HPP} + FILES ${STDEXEC_ASIO_CONFIG_HPP} ) - target_compile_definitions(stdexec_asio_pool INTERFACE STDEXEC_ASIO_USES_STANDALONE) + target_compile_definitions(asioexec INTERFACE STDEXEC_ASIO_USES_STANDALONE) - target_link_libraries(stdexec_asio_pool INTERFACE + target_link_libraries(asioexec INTERFACE STDEXEC::stdexec asio ) - install(TARGETS stdexec_asio_pool EXPORT stdexec-exports FILE_SET headers) - - add_library(asioexec_asio INTERFACE) - list(APPEND stdexec_export_targets asioexec_asio) - add_library(STDEXEC::asioexec_asio ALIAS asioexec_asio) - - target_sources(asioexec_asio PUBLIC + install(TARGETS asioexec + EXPORT stdexec-exports FILE_SET headers - TYPE HEADERS - BASE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/include - FILES ${asioexec_sources} - BASE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include - FILES ${ASIOEXEC_CONFIG_HPP} ) - target_compile_definitions(asioexec_asio INTERFACE STDEXEC_ASIO_USES_STANDALONE) - - target_link_libraries(asioexec_asio INTERFACE - STDEXEC::stdexec - asio - ) - install(TARGETS asioexec_asio EXPORT stdexec-exports FILE_SET headers) else() message(FATAL_ERROR "ASIO implementation is not configured") endif() diff --git a/cmake/Modules/ConfigureTBB.cmake b/cmake/Modules/ConfigureTBB.cmake index fdb18118b..d55373927 100644 --- a/cmake/Modules/ConfigureTBB.cmake +++ b/cmake/Modules/ConfigureTBB.cmake @@ -33,18 +33,23 @@ endif() if (STDEXEC_ENABLE_TBB) # CONFIGURE_DEPENDS ensures that CMake reconfigures when a relevant hpp file is # added or removed. - file(GLOB_RECURSE tbbpool_headers CONFIGURE_DEPENDS include/execpools/tbb/*.hpp) - add_library(tbbpool INTERFACE) - list(APPEND stdexec_export_targets tbbpool) - add_library(STDEXEC::tbbpool ALIAS tbbpool) - target_sources(tbbpool + file(GLOB_RECURSE tbbexec_headers CONFIGURE_DEPENDS include/exec/tbb/*.hpp) + add_library(tbbexec INTERFACE) + list(APPEND stdexec_export_targets tbbexec) + add_library(STDEXEC::tbbexec ALIAS tbbexec) + + # These aliases are provided for backwards compatibility with the old target names + add_library(tbbpool ALIAS tbbexec) + add_library(STDEXEC::tbbpool ALIAS tbbexec) + + target_sources(tbbexec PUBLIC FILE_SET headers TYPE HEADERS BASE_DIRS include - FILES ${tbbpool_headers} + FILES ${tbbexec_headers} ) - target_compile_definitions(tbbpool INTERFACE + target_compile_definitions(tbbexec INTERFACE STDEXEC_ENABLE_TBB ) @@ -52,11 +57,11 @@ if (STDEXEC_ENABLE_TBB) TBB::tbb ) - target_link_libraries(tbbpool INTERFACE + target_link_libraries(tbbexec INTERFACE STDEXEC::stdexec ) - install(TARGETS tbbpool + install(TARGETS tbbexec EXPORT stdexec-exports FILE_SET headers ) diff --git a/cmake/Modules/ConfigureTaskflow.cmake b/cmake/Modules/ConfigureTaskflow.cmake index 903a347c5..35cc9ba8d 100644 --- a/cmake/Modules/ConfigureTaskflow.cmake +++ b/cmake/Modules/ConfigureTaskflow.cmake @@ -7,15 +7,29 @@ if(STDEXEC_ENABLE_TASKFLOW) GIT_TAG v3.7.0 OPTIONS "TF_BUILD_TESTS OFF" ) - file(GLOB_RECURSE taskflow_pool include/execpools/taskflow/*.hpp) - add_library(taskflow_pool INTERFACE ${taskflowexec_sources}) - target_compile_definitions(taskflow_pool INTERFACE STDEXEC_ENABLE_TASKFLOW) - list(APPEND stdexec_export_targets taskflow_pool) - add_library(STDEXEC::taskflow_pool ALIAS taskflow_pool) + file(GLOB_RECURSE taskflowexec_headers CONFIGURE_DEPENDS include/exec/taskflow/*.hpp) + add_library(taskflowexec INTERFACE ${taskflowexec_headers}) + list(APPEND stdexec_export_targets taskflowexec) + add_library(STDEXEC::taskflowexec ALIAS taskflowexec) - target_link_libraries(taskflow_pool - INTERFACE - STDEXEC::stdexec + # These aliases are provided for backwards compatibility with the old target names + add_library(taskflow_pool ALIAS taskflowexec) + add_library(STDEXEC::taskflow_pool ALIAS taskflowexec) + + target_sources(taskflowexec + PUBLIC + FILE_SET headers + TYPE HEADERS + BASE_DIRS include + FILES ${taskflowexec_headers} + ) + target_compile_definitions(taskflowexec INTERFACE STDEXEC_ENABLE_TASKFLOW) + + target_link_libraries(stdexec INTERFACE Taskflow ) + + target_link_libraries(taskflowexec INTERFACE + STDEXEC::stdexec + ) endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 89b7edd75..ea0896041 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -72,21 +72,21 @@ endif() if (STDEXEC_ENABLE_TBB) add_executable(example.benchmark.tbb_thread_pool benchmark/tbb_thread_pool.cpp) - target_link_libraries(example.benchmark.tbb_thread_pool PRIVATE STDEXEC::tbbpool) + target_link_libraries(example.benchmark.tbb_thread_pool PRIVATE STDEXEC::tbbexec) add_executable(example.benchmark.tbb_thread_pool_nested benchmark/tbb_thread_pool_nested.cpp) - target_link_libraries(example.benchmark.tbb_thread_pool_nested PRIVATE STDEXEC::tbbpool) + target_link_libraries(example.benchmark.tbb_thread_pool_nested PRIVATE STDEXEC::tbbexec) add_executable(example.benchmark.fibonacci benchmark/fibonacci.cpp) - target_link_libraries(example.benchmark.fibonacci PRIVATE STDEXEC::tbbpool) + target_link_libraries(example.benchmark.fibonacci PRIVATE STDEXEC::tbbexec) endif() if(STDEXEC_ENABLE_TASKFLOW) add_executable(example.benchmark.taskflow_thread_pool benchmark/taskflow_thread_pool.cpp) - target_link_libraries(example.benchmark.taskflow_thread_pool PRIVATE STDEXEC::taskflow_pool) + target_link_libraries(example.benchmark.taskflow_thread_pool PRIVATE STDEXEC::taskflowexec) endif() if(STDEXEC_ENABLE_ASIO) add_executable(example.benchmark.asio_thread_pool benchmark/asio_thread_pool.cpp) -target_link_libraries(example.benchmark.asio_thread_pool PRIVATE STDEXEC::asio_pool) +target_link_libraries(example.benchmark.asio_thread_pool PRIVATE STDEXEC::asioexec) endif() diff --git a/examples/benchmark/asio_thread_pool.cpp b/examples/benchmark/asio_thread_pool.cpp index 902ae41ba..3d6dae57b 100644 --- a/examples/benchmark/asio_thread_pool.cpp +++ b/examples/benchmark/asio_thread_pool.cpp @@ -15,20 +15,19 @@ * limitations under the License. */ #include "./common.hpp" +#include #include -#include struct RunThread { - void operator()( - execpools::asio_thread_pool& pool, - std::size_t total_scheds, - std::size_t tid, - std::barrier<>& barrier, + void operator()(exec::asio::asio_thread_pool& pool, + std::size_t total_scheds, + std::size_t tid, + std::barrier<>& barrier, #ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE - std::span buffer, + std::span buffer, #endif - std::atomic& stop, - exec::numa_policy numa) { + std::atomic& stop, + exec::numa_policy numa) { int numa_node = numa.thread_index_to_node(tid); numa.bind_to_node(numa_node); auto scheduler = pool.get_scheduler(); @@ -83,5 +82,5 @@ struct RunThread { }; auto main(int argc, char** argv) -> int { - my_main(argc, argv); + my_main(argc, argv); } diff --git a/examples/benchmark/fibonacci.cpp b/examples/benchmark/fibonacci.cpp index 69632a4ea..14fa101f7 100644 --- a/examples/benchmark/fibonacci.cpp +++ b/examples/benchmark/fibonacci.cpp @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include @@ -102,10 +102,10 @@ auto main(int argc, char** argv) -> int { return -1; } - std::variant pool; + std::variant pool; if (argv[4] == std::string_view("tbb")) { - pool.emplace(static_cast(std::thread::hardware_concurrency())); + pool.emplace(static_cast(std::thread::hardware_concurrency())); } else { pool.emplace( std::thread::hardware_concurrency(), exec::bwos_params{}, exec::get_numa_policy()); diff --git a/examples/benchmark/taskflow_thread_pool.cpp b/examples/benchmark/taskflow_thread_pool.cpp index 98c724b9b..2c9ed9c58 100644 --- a/examples/benchmark/taskflow_thread_pool.cpp +++ b/examples/benchmark/taskflow_thread_pool.cpp @@ -16,11 +16,11 @@ */ #include "./common.hpp" #include -#include +#include struct RunThread { void operator()( - execpools::taskflow_thread_pool& pool, + exec::taskflow::taskflow_thread_pool& pool, std::size_t total_scheds, std::size_t tid, std::barrier<>& barrier, @@ -83,5 +83,5 @@ struct RunThread { }; auto main(int argc, char** argv) -> int { - my_main(argc, argv); + my_main(argc, argv); } diff --git a/examples/benchmark/tbb_thread_pool.cpp b/examples/benchmark/tbb_thread_pool.cpp index b58f48832..d9da9379c 100644 --- a/examples/benchmark/tbb_thread_pool.cpp +++ b/examples/benchmark/tbb_thread_pool.cpp @@ -16,19 +16,19 @@ */ #include "./common.hpp" #include -#include +#include struct RunThread { - void operator()( - execpools::tbb_thread_pool& pool, - std::size_t total_scheds, - std::size_t tid, - std::barrier<>& barrier, + void operator()(exec::tbb::tbb_thread_pool& pool, + std::size_t total_scheds, + std::size_t tid, + std::barrier<>& barrier, #ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE - std::span buffer, + std::span buffer, #endif - std::atomic& stop, - exec::numa_policy numa) { + std::atomic& stop, + exec::numa_policy numa) + { int numa_node = numa.thread_index_to_node(tid); numa.bind_to_node(numa_node); auto scheduler = pool.get_scheduler(); @@ -83,5 +83,5 @@ struct RunThread { }; auto main(int argc, char** argv) -> int { - my_main(argc, argv); + my_main(argc, argv); } diff --git a/examples/benchmark/tbb_thread_pool_nested.cpp b/examples/benchmark/tbb_thread_pool_nested.cpp index 0382fec0f..a5257560b 100644 --- a/examples/benchmark/tbb_thread_pool_nested.cpp +++ b/examples/benchmark/tbb_thread_pool_nested.cpp @@ -16,20 +16,19 @@ */ #include "./common.hpp" -#include +#include #include struct RunThread { - void operator()( - execpools::tbb_thread_pool& pool, - std::size_t total_scheds, - std::size_t tid, - std::barrier<>& barrier, + void operator()(exec::tbb::tbb_thread_pool& pool, + std::size_t total_scheds, + std::size_t tid, + std::barrier<>& barrier, #ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE - [[maybe_unused]] std::span buffer, + [[maybe_unused]] std::span buffer, #endif - std::atomic& stop, - exec::numa_policy numa) { + std::atomic& stop, + exec::numa_policy numa) { int numa_node = numa.thread_index_to_node(tid); numa.bind_to_node(numa_node); auto scheduler = pool.get_scheduler(); @@ -55,5 +54,5 @@ struct RunThread { }; auto main(int argc, char** argv) -> int { - my_main(argc, argv); + my_main(argc, argv); } diff --git a/include/asioexec/as_default_on.hpp b/include/asioexec/as_default_on.hpp index b4ee14177..6f97e9676 100644 --- a/include/asioexec/as_default_on.hpp +++ b/include/asioexec/as_default_on.hpp @@ -18,6 +18,8 @@ #pragma once +#include "../stdexec/__detail/__config.hpp" + #if STDEXEC_MSVC() # pragma message( \ "WARNING: The header is deprecated. Please include instead.") diff --git a/include/asioexec/asio_config.hpp.in b/include/asioexec/asio_config.hpp.in deleted file mode 100644 index 7b2b657db..000000000 --- a/include/asioexec/asio_config.hpp.in +++ /dev/null @@ -1,50 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * Copyright (c) 2025 Robert Leahy. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - * - * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://llvm.org/LICENSE.txt - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#cmakedefine01 ASIOEXEC_USES_STANDALONE -#cmakedefine01 ASIOEXEC_USES_BOOST - -#if ASIOEXEC_USES_BOOST -# include -# include -# include -# include -# define ASIOEXEC_ASIO_NAMESPACE boost::asio -#elif ASIOEXEC_USES_STANDALONE -# include -# include -# define ASIOEXEC_ASIO_NAMESPACE asio -#endif - -namespace asioexec { -#if ASIOEXEC_USES_BOOST - namespace asio_impl = ::boost::asio; - using error_code = ::boost::system::error_code; - using error_condition = ::boost::system::error_condition; - namespace errc = ::boost::system::errc; - using system_error = ::boost::system::system_error; -#elif ASIOEXEC_USES_STANDALONE - namespace asio_impl = ::asio; - using error_code = std::error_code; - using error_condition = std::error_condition; - using errc = std::errc; - using system_error = std::system_error; -#endif -} diff --git a/include/asioexec/completion_token.hpp b/include/asioexec/completion_token.hpp index b5dc5a066..a61ace675 100644 --- a/include/asioexec/completion_token.hpp +++ b/include/asioexec/completion_token.hpp @@ -18,6 +18,8 @@ #pragma once +#include "../stdexec/__detail/__config.hpp" + #if STDEXEC_MSVC() # pragma message( \ "WARNING: The header is deprecated. Please include instead.") diff --git a/include/asioexec/executor_with_default.hpp b/include/asioexec/executor_with_default.hpp index 8038ac89c..a839d4480 100644 --- a/include/asioexec/executor_with_default.hpp +++ b/include/asioexec/executor_with_default.hpp @@ -18,6 +18,8 @@ #pragma once +#include "../stdexec/__detail/__config.hpp" + #if STDEXEC_MSVC() # pragma message( \ "WARNING: The header is deprecated. Please include instead.") diff --git a/include/asioexec/use_sender.hpp b/include/asioexec/use_sender.hpp index 10e60c3bb..9883b9b95 100644 --- a/include/asioexec/use_sender.hpp +++ b/include/asioexec/use_sender.hpp @@ -18,6 +18,8 @@ #pragma once +#include "../stdexec/__detail/__config.hpp" + #if STDEXEC_MSVC() # pragma message( \ "WARNING: The header is deprecated. Please include instead.") diff --git a/include/exec/asio/asio_config.hpp.in b/include/exec/asio/asio_config.hpp.in index cfad32530..2d293ddc8 100644 --- a/include/exec/asio/asio_config.hpp.in +++ b/include/exec/asio/asio_config.hpp.in @@ -18,29 +18,29 @@ #pragma once -#cmakedefine01 ASIOEXEC_USES_STANDALONE -#cmakedefine01 ASIOEXEC_USES_BOOST +#cmakedefine01 STDEXEC_ASIO_USES_STANDALONE +#cmakedefine01 STDEXEC_ASIO_USES_BOOST -#if ASIOEXEC_USES_BOOST +#if STDEXEC_ASIO_USES_BOOST # include # include # include # include # define ASIOEXEC_ASIO_NAMESPACE boost::asio -#elif ASIOEXEC_USES_STANDALONE +#elif STDEXEC_ASIO_USES_STANDALONE # include # include # define ASIOEXEC_ASIO_NAMESPACE asio #endif namespace experimental::execution::asio { -#if ASIOEXEC_USES_BOOST +#if STDEXEC_ASIO_USES_BOOST namespace asio_impl = ::boost::asio; using error_code = ::boost::system::error_code; using error_condition = ::boost::system::error_condition; namespace errc = ::boost::system::errc; using system_error = ::boost::system::system_error; -#elif ASIOEXEC_USES_STANDALONE +#elif STDEXEC_ASIO_USES_STANDALONE namespace asio_impl = ::asio; using error_code = std::error_code; using error_condition = std::error_condition; diff --git a/include/exec/asio/asio_thread_pool.hpp b/include/exec/asio/asio_thread_pool.hpp new file mode 100644 index 000000000..895952f42 --- /dev/null +++ b/include/exec/asio/asio_thread_pool.hpp @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2023 Ben FrantzDale + * Copyright (c) 2021-2023 Facebook, Inc. and its affiliates. + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "../thread_pool_base.hpp" + +namespace experimental::execution::asio +{ + class asio_thread_pool : public exec::thread_pool_base + { + public: + asio_thread_pool() + : pool_() + , executor_(pool_.executor()) + {} + + explicit asio_thread_pool(uint32_t num_threads) + : pool_(num_threads) + , executor_(pool_.executor()) + {} + + ~asio_thread_pool() = default; + + [[nodiscard]] + auto available_parallelism() const -> std::uint32_t + { + return static_cast( + asio_impl::query(executor_, asio_impl::execution::occupancy)); + } + + [[nodiscard]] + auto get_executor() const + { + return executor_; + } + + private: + friend exec::thread_pool_base; + + template + friend struct exec::_pool_::opstate; + + [[nodiscard]] + static constexpr auto forward_progress_guarantee() -> STDEXEC::forward_progress_guarantee + { + return STDEXEC::forward_progress_guarantee::parallel; + } + + void enqueue(exec::_pool_::task_base* task, std::uint32_t tid = 0) noexcept + { + asio_impl::post(pool_, [task, tid] { task->execute_(task, /*tid=*/tid); }); + } + + asio_impl::thread_pool pool_; + // Need to store implicitly the executor, thread_pool::executor() is not const + asio_impl::thread_pool::executor_type executor_; + }; +} // namespace experimental::execution::asio + +namespace exec = experimental::execution; diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index 408b3b1c3..e76befad6 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -1205,13 +1205,15 @@ namespace experimental::execution , thread_index_{tid} , constraints_{constraints} { - this->execute_ = [](task_base* t, std::uint32_t const /* tid */) noexcept + this->execute_ = [](task_base* t, + [[maybe_unused]] + std::uint32_t const tid) noexcept { auto& op = *static_cast<_opstate*>(t); auto stoken = get_stop_token(get_env(op.rcvr_)); - // NOLINTNEXTLINE(bugprone-branch-clone) + if constexpr (STDEXEC::unstoppable_token) - { + { // NOLINT(bugprone-branch-clone) STDEXEC::set_value(static_cast(op.rcvr_)); } else if (stoken.stop_requested()) diff --git a/include/exec/taskflow/taskflow_thread_pool.hpp b/include/exec/taskflow/taskflow_thread_pool.hpp new file mode 100644 index 000000000..434aba3ef --- /dev/null +++ b/include/exec/taskflow/taskflow_thread_pool.hpp @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023 Ben FrantzDale + * Copyright (c) 2024 David Eles + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "../thread_pool_base.hpp" + +namespace experimental::execution::taskflow +{ + class taskflow_thread_pool : public exec::thread_pool_base + { + public: + //! Constructor forwards to tf::Executor constructor: + template + requires STDEXEC::__std::constructible_from + explicit taskflow_thread_pool(Args&&... args) + : executor_(std::forward(args)...) + {} + + [[nodiscard]] + auto available_parallelism() const -> std::uint32_t + { + return static_cast(executor_.num_workers()); + } + private: + [[nodiscard]] + static constexpr auto forward_progress_guarantee() -> STDEXEC::forward_progress_guarantee + { + return STDEXEC::forward_progress_guarantee::parallel; + } + + friend exec::thread_pool_base; + + template + friend struct exec::_pool_::opstate; + + void enqueue(exec::_pool_::task_base* task, std::uint32_t tid = 0) noexcept + { + executor_.silent_async([task, tid] { task->execute_(task, /*tid=*/tid); }); + } + + tf::Executor executor_; + }; +} // namespace experimental::execution::taskflow + +namespace exec = experimental::execution; diff --git a/include/exec/tbb/tbb_thread_pool.hpp b/include/exec/tbb/tbb_thread_pool.hpp new file mode 100644 index 000000000..db59cff32 --- /dev/null +++ b/include/exec/tbb/tbb_thread_pool.hpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 Ben FrantzDale + * Copyright (c) 2021-2023 Facebook, Inc. and its affiliates. + * Copyright (c) 2021-2024 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "../thread_pool_base.hpp" + +namespace experimental::execution::tbb +{ + class tbb_thread_pool : public exec::thread_pool_base + { + public: + //! Constructor forwards to ::tbb::task_arena constructor: + template + requires STDEXEC::__std::constructible_from<::tbb::task_arena, Args...> + explicit tbb_thread_pool(Args&&... args) + : arena_{std::forward(args)...} + { + arena_.initialize(); + } + + [[nodiscard]] + auto available_parallelism() const -> std::uint32_t + { + return static_cast(arena_.max_concurrency()); + } + + private: + friend exec::thread_pool_base; + + template + friend struct exec::_pool_::opstate; + + [[nodiscard]] + static constexpr auto forward_progress_guarantee() -> STDEXEC::forward_progress_guarantee + { + return STDEXEC::forward_progress_guarantee::parallel; + } + + void enqueue(exec::_pool_::task_base* task, std::uint32_t tid = 0) noexcept + { + arena_.enqueue([task, tid] { task->execute_(task, /*tid=*/tid); }); + } + + ::tbb::task_arena arena_{::tbb::task_arena::attach{}}; + }; +} // namespace experimental::execution::tbb + +namespace exec = experimental::execution; diff --git a/include/exec/thread_pool_base.hpp b/include/exec/thread_pool_base.hpp new file mode 100644 index 000000000..b4ebfe8a2 --- /dev/null +++ b/include/exec/thread_pool_base.hpp @@ -0,0 +1,564 @@ +/* + * Copyright (c) 2023 Ben FrantzDale + * Copyright (c) 2021-2023 Facebook, Inc. and its affiliates. + * Copyright (c) 2021-2024 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace experimental::execution +{ + struct CANNOT_DISPATCH_BULK_ALGORITHM_TO_THE_POOL_SCHEDULER; + struct BECAUSE_THERE_IS_NO_POOL_SCHEDULER_IN_THE_ENVIRONMENT; + struct ADD_A_CONTINUES_ON_TRANSITION_TO_THE_POOL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM; + + //! This is a P2300-style thread pool wrapping base class, which its docs describe as "A + //! class that represents an explicit, user-managed task scheduler arena." + //! + //! * template void enqueue(F &&f) + //! and + //! * template auto execute(F &&f) -> decltype(f()) + //! + namespace _pool_ + { + template + struct opstate; + + template + using __decay_ref_t = STDEXEC::__decay_t<_Ty>&; + + using task_base = exec::static_thread_pool::task_base; + } // namespace _pool_ + + template // CRTP + class thread_pool_base + { + template + friend struct _pool_::opstate; + + public: + struct scheduler; + + struct domain : STDEXEC::default_domain + { + template Sender, class Env> + static constexpr auto transform_sender(STDEXEC::set_value_t, Sender&& sndr, Env const & env) + { + auto& [tag, data, child] = sndr; + auto& [pol, shape, fun] = data; + + if constexpr (STDEXEC::__completes_on) + { + auto sch = + STDEXEC::get_completion_scheduler(STDEXEC::get_env(child), env); + using sender_t = + scheduler::template bulk_sender_t; + return sender_t{*sch.pool_, + STDEXEC::__forward_like(child), + shape, + STDEXEC::__forward_like(fun)}; + } + else + { + return STDEXEC::__not_a_sender< + STDEXEC::_WHAT_(CANNOT_DISPATCH_BULK_ALGORITHM_TO_THE_POOL_SCHEDULER), + STDEXEC::_WHY_(BECAUSE_THERE_IS_NO_POOL_SCHEDULER_IN_THE_ENVIRONMENT), + STDEXEC::_WHERE_(STDEXEC::_IN_ALGORITHM_, STDEXEC::tag_of_t), + STDEXEC::_TO_FIX_THIS_ERROR_( + ADD_A_CONTINUES_ON_TRANSITION_TO_THE_POOL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM), + STDEXEC::_WITH_PRETTY_SENDER_, + STDEXEC::_WITH_ENVIRONMENT_(Env)>(); + } + } + + template Sender, class Env> + static constexpr auto transform_sender(STDEXEC::set_value_t, Sender&& sndr, Env const & env); + }; + + struct scheduler + { + private: + template + friend struct _pool_::opstate; + + class sender + { + public: + using sender_concept = STDEXEC::sender_t; + template + static consteval auto get_completion_signatures() noexcept + { + if constexpr (STDEXEC::unstoppable_token>) + { + return STDEXEC::completion_signatures(); + } + else + { + return STDEXEC::completion_signatures(); + } + } + + template + auto + query(STDEXEC::get_completion_scheduler_t) const noexcept -> DerivedPoolType::scheduler + { + return pool_.get_scheduler(); + } + + template + auto query(STDEXEC::get_completion_domain_t) const noexcept -> domain + { + return {}; + } + + auto get_env() const noexcept -> sender const & + { + return *this; + } + + template + auto connect(Receiver rcvr) const -> _pool_::opstate + { + return _pool_::opstate{this->pool_, + static_cast(rcvr)}; + } + + private: + friend struct DerivedPoolType::scheduler; + + explicit sender(DerivedPoolType& pool) noexcept + : pool_(pool) + {} + + DerivedPoolType& pool_; + }; + + template + using bulk_non_throwing = STDEXEC::__mbool< + // If function invocation doesn't throw + STDEXEC::__nothrow_callable...> && + // and emplacing a tuple doesn't throw + noexcept(STDEXEC::__decayed_std_tuple(STDEXEC::__declval()...)) + // there's no need to advertise completion with `exception_ptr` + >; + + template + struct bulk_shared_state : _pool_::task_base + { + using variant_t = STDEXEC::__value_types_of_t, + STDEXEC::__q, + STDEXEC::__q>; + + variant_t data_; + DerivedPoolType& pool_; + Receiver rcvr_; + Shape shape_; + Fun fun_; + + std::atomic finished_threads_{0}; + std::atomic thread_with_exception_{0}; + std::exception_ptr exception_; + + [[nodiscard]] + auto num_agents_required() const -> std::uint32_t + { + // With work stealing, is std::min necessary, or can we feel free to ask for more agents (tasks) + // than we can actually deal with at one time? + return static_cast( + (std::min) (shape_, static_cast(pool_.available_parallelism()))); + } + + template + void apply(F f) + { + std::visit([&](auto& tupl) -> void + { std::apply([&](auto&... args) -> void { f(args...); }, tupl); }, + data_); + } + + explicit bulk_shared_state(DerivedPoolType& pool, Receiver rcvr, Shape shape, Fun fun) + : pool_(pool) + , rcvr_{static_cast(rcvr)} + , shape_{shape} + , fun_{fun} + , thread_with_exception_{num_agents_required()} + { + this->execute_ = [](_pool_::task_base* t, std::uint32_t tid) noexcept + { + auto& self = *static_cast(t); + auto total_threads = self.num_agents_required(); + + auto computation = [&](auto&... args) + { + auto [begin, end] = exec::_pool_::even_share(self.shape_, tid, total_threads); + self.fun_(begin, end, args...); + }; + + auto completion = [&](auto&... args) + { + STDEXEC::set_value(static_cast(self.rcvr_), std::move(args)...); + }; + + if constexpr (MayThrow) + { + STDEXEC_TRY + { + self.apply(computation); + } + STDEXEC_CATCH_ALL + { + std::uint32_t expected = total_threads; + + if (self.thread_with_exception_.compare_exchange_strong(expected, + tid, + std::memory_order_relaxed, + std::memory_order_relaxed)) + { + self.exception_ = std::current_exception(); + } + } + + bool const is_last_thread = self.finished_threads_.fetch_add(1) + == (total_threads - 1); + + if (is_last_thread) + { + if (self.exception_) + { + STDEXEC::set_error(static_cast(self.rcvr_), + std::move(self.exception_)); + } + else + { + self.apply(completion); + } + } + } + else + { + self.apply(computation); + + bool const is_last_thread = self.finished_threads_.fetch_add(1) + == (total_threads - 1); + + if (is_last_thread) + { + self.apply(completion); + } + } + }; + } + }; + + template + struct bulk_receiver + { + using receiver_concept = STDEXEC::receiver_t; + + using shared_state = bulk_shared_state; + + void enqueue() noexcept + { + shared_state_.pool_.bulk_enqueue(&shared_state_, shared_state_.num_agents_required()); + } + + template + void set_value(As&&... as) noexcept + { + using tuple_t = STDEXEC::__decayed_std_tuple; + + shared_state& state = shared_state_; + + if constexpr (MayThrow) + { + STDEXEC_TRY + { + state.data_.template emplace(static_cast(as)...); + } + STDEXEC_CATCH_ALL + { + STDEXEC::set_error(std::move(state.rcvr_), std::current_exception()); + } + } + else + { + state.data_.template emplace(static_cast(as)...); + } + + if (state.shape_) + { + enqueue(); + } + else + { + state.apply([&](auto&... args) + { STDEXEC::set_value(std::move(state.rcvr_), std::move(args)...); }); + } + } + + template + void set_error(Error&& err) noexcept + { + shared_state& state = shared_state_; + STDEXEC::set_error(static_cast(state.rcvr_), static_cast(err)); + } + + void set_stopped() noexcept + { + shared_state& state = shared_state_; + STDEXEC::set_stopped(static_cast(state.rcvr_)); + } + + auto get_env() const noexcept -> STDEXEC::env_of_t + { + return STDEXEC::get_env(shared_state_.rcvr_); + } + + shared_state& shared_state_; + }; + + template + struct bulk_opstate + { + static constexpr bool may_throw = + !STDEXEC::__value_types_of_t, + STDEXEC::__mbind_front_q, + STDEXEC::__q>::value; + + using bulk_rcvr = bulk_receiver; + using shared_state = bulk_shared_state; + using inner_opstate = STDEXEC::connect_result_t; + + void start() & noexcept + { + STDEXEC::start(inner_op_); + } + + bulk_opstate(DerivedPoolType& pool, Shape shape, Fun fun, CvSender&& sndr, Receiver rcvr) + : shared_state_(pool, static_cast(rcvr), shape, fun) + , inner_op_{STDEXEC::connect(static_cast(sndr), bulk_rcvr{shared_state_})} + {} + + shared_state shared_state_; + inner_opstate inner_op_; + }; + + template + struct bulk_sender + { + using sender_concept = STDEXEC::sender_t; + + template + using _with_error_invoke_t = STDEXEC::__eptr_completion_unless_t, Env...>, + STDEXEC::__mtransform, + STDEXEC::__mbind_front_q>, + STDEXEC::__q>>; + + template + using _set_value_t = + STDEXEC::completion_signatures...)>; + + template + using _completion_signatures_t = STDEXEC::transform_completion_signatures< + STDEXEC::__completion_signatures_of_t, Env...>, + _with_error_invoke_t, + _set_value_t>; + + template + using bulk_opstate_t = + bulk_opstate, Receiver, Shape, Fun>; + + template Self, STDEXEC::receiver Receiver> + requires STDEXEC::receiver_of>> + STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Receiver rcvr) + noexcept(STDEXEC::__nothrow_constructible_from, + DerivedPoolType&, + Shape, + Fun, + Sender, + Receiver>) + -> bulk_opstate_t + { + return bulk_opstate_t{self.pool_, + self.shape_, + static_cast(self).fun_, + static_cast(self).sndr_, + static_cast(rcvr)}; + } + STDEXEC_EXPLICIT_THIS_END(connect) + + template Self, class... Env> + static consteval auto get_completion_signatures() // + -> _completion_signatures_t + { + return {}; + } + + struct attrs + { + template + requires STDEXEC::__queryable_with, Tag, As...> + auto query(Tag, As&&... as) const + noexcept(STDEXEC::__nothrow_queryable_with, Tag, As...>) + -> decltype(auto) + { + return STDEXEC::__query()(STDEXEC::get_env(sndr_.sndr_), static_cast(as)...); + } + + bulk_sender const & sndr_; + }; + + [[nodiscard]] + auto get_env() const noexcept -> attrs + { + return {*this}; + } + + DerivedPoolType& pool_; + Sender sndr_; + Shape shape_; + Fun fun_; + }; + + template + using bulk_sender_t = bulk_sender, Shape, Fun>; + + friend thread_pool_base; + + explicit scheduler(DerivedPoolType& pool) noexcept + : pool_(&pool) + {} + + DerivedPoolType* pool_; + + public: + auto operator==(scheduler const &) const -> bool = default; + + [[nodiscard]] + constexpr auto query(STDEXEC::get_forward_progress_guarantee_t) const noexcept + -> STDEXEC::forward_progress_guarantee + { + return pool_->forward_progress_guarantee(); + } + + template Tag> + [[nodiscard]] + constexpr auto query(STDEXEC::get_completion_scheduler_t) const noexcept -> scheduler + { + return *this; + } + + template Tag> + [[nodiscard]] + constexpr auto query(STDEXEC::get_completion_domain_t) const noexcept -> domain + { + return {}; + } + + template Tag> + [[nodiscard]] + constexpr auto query(STDEXEC::get_completion_behavior_t) const noexcept + { + return STDEXEC::completion_behavior::asynchronous; + } + + [[nodiscard]] + auto schedule() const noexcept -> sender + { + return sender{*pool_}; + } + }; + + [[nodiscard]] + auto get_scheduler() noexcept -> scheduler + { + return scheduler{static_cast(*this)}; + } + + [[nodiscard]] + auto available_parallelism() const -> std::uint32_t + { + return static_cast(*this).available_parallelism(); + } + + private: + void enqueue(_pool_::task_base* task, std::uint32_t tid = 0) noexcept + { + static_cast(*this).enqueue(task, tid); + } + + void bulk_enqueue(_pool_::task_base* task, std::uint32_t n_threads) noexcept + { + for (std::uint32_t tid = 0; tid < n_threads; ++tid) + { + this->enqueue(task, tid); + } + } + }; + + namespace _pool_ + { + template + struct opstate : _pool_::task_base + { + friend class thread_pool_base; + + PoolType& pool_; + Receiver rcvr_; + + explicit opstate(PoolType& pool, Receiver rcvr) + : pool_(pool) + , rcvr_(std::move(rcvr)) + { + this->execute_ = [](_pool_::task_base* t, std::uint32_t) noexcept + { + auto& op = *static_cast(t); + auto stoken = STDEXEC::get_stop_token(STDEXEC::get_env(op.rcvr_)); + + if constexpr (STDEXEC::unstoppable_token) + { // NOLINT(bugprone-branch-clone) + STDEXEC::set_value(static_cast(op.rcvr_)); + } + else if (stoken.stop_requested()) + { + STDEXEC::set_stopped(static_cast(op.rcvr_)); + } + else + { + STDEXEC::set_value(static_cast(op.rcvr_)); + } + }; + } + + void enqueue() noexcept + { + pool_.enqueue(this); + } + + void start() & noexcept + { + enqueue(); + } + }; + } // namespace _pool_ +} // namespace experimental::execution + +namespace exec = experimental::execution; diff --git a/include/execpools/asio/asio_config.hpp.in b/include/execpools/asio/asio_config.hpp.in deleted file mode 100644 index f74e2d30e..000000000 --- a/include/execpools/asio/asio_config.hpp.in +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#cmakedefine01 STDEXEC_ASIO_USES_STANDALONE -#cmakedefine01 STDEXEC_ASIO_USES_BOOST - -#if STDEXEC_ASIO_USES_BOOST -# include -#elif STDEXEC_ASIO_USES_STANDALONE -# include -#endif - -namespace execpools { -#if STDEXEC_ASIO_USES_BOOST - namespace asio_impl = boost::asio; -#elif STDEXEC_ASIO_USES_STANDALONE - namespace asio_impl = asio; -#endif -} // namespace execpools diff --git a/include/execpools/asio/asio_thread_pool.hpp b/include/execpools/asio/asio_thread_pool.hpp index 3c2584f83..2b3d74859 100644 --- a/include/execpools/asio/asio_thread_pool.hpp +++ b/include/execpools/asio/asio_thread_pool.hpp @@ -1,64 +1,38 @@ /* + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once - -#include -#include - -namespace execpools -{ - class asio_thread_pool : public execpools::thread_pool_base - { - public: - asio_thread_pool() - : pool_() - , executor_(pool_.executor()) - {} - explicit asio_thread_pool(uint32_t num_threads) - : pool_(num_threads) - , executor_(pool_.executor()) - {} - - ~asio_thread_pool() = default; - - [[nodiscard]] - auto available_parallelism() const -> std::uint32_t - { - return static_cast( - asio_impl::query(executor_, asio_impl::execution::occupancy)); - } +#pragma once - [[nodiscard]] - auto get_executor() const - { - return executor_; - } +#include "../../stdexec/__detail/__config.hpp" - private: - [[nodiscard]] - static constexpr auto forward_progress_guarantee() -> STDEXEC::forward_progress_guarantee - { - return STDEXEC::forward_progress_guarantee::parallel; - } +#if STDEXEC_MSVC() +# pragma message( \ + "WARNING: The header is deprecated. Please include instead.") +#else +# warning \ + "The header is deprecated. Please include instead." +#endif - friend execpools::thread_pool_base; +#include "../../exec/asio/asio_thread_pool.hpp" // IWYU pragma: export - template - friend struct execpools::operation; +namespace execpools +{ - void enqueue(execpools::task_base* task, std::uint32_t tid = 0) noexcept - { - asio_impl::post(pool_, [task, tid] { task->execute_(task, /*tid=*/tid); }); - } + using asio_thread_pool + [[deprecated("execpools::asio_thread_pool has been renamed to " + "exec::asio::asio_thread_pool instead.")]] = exec::asio::asio_thread_pool; - asio_impl::thread_pool pool_; - // Need to store implicitly the executor, thread_pool::executor() is not const - asio_impl::thread_pool::executor_type executor_; - }; } // namespace execpools diff --git a/include/execpools/taskflow/taskflow_thread_pool.hpp b/include/execpools/taskflow/taskflow_thread_pool.hpp index 4132b7c0b..0078cda3d 100644 --- a/include/execpools/taskflow/taskflow_thread_pool.hpp +++ b/include/execpools/taskflow/taskflow_thread_pool.hpp @@ -1,51 +1,38 @@ /* + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once -#include +#include "../../stdexec/__detail/__config.hpp" + +#if STDEXEC_MSVC() +# pragma message( \ + "WARNING: The header is deprecated. Please include instead.") +#else +# warning \ + "The header is deprecated. Please include instead." +#endif -#include +#include "../../exec/taskflow/taskflow_thread_pool.hpp" // IWYU pragma: export namespace execpools { - class taskflow_thread_pool : public execpools::thread_pool_base - { - public: - //! Constructor forwards to tbb::task_arena constructor: - template - requires STDEXEC::__std::constructible_from - explicit taskflow_thread_pool(Args&&... args) - : executor_(std::forward(args)...) - {} - - [[nodiscard]] - auto available_parallelism() const -> std::uint32_t - { - return static_cast(executor_.num_workers()); - } - private: - [[nodiscard]] - static constexpr auto forward_progress_guarantee() -> STDEXEC::forward_progress_guarantee - { - return STDEXEC::forward_progress_guarantee::parallel; - } - - friend execpools::thread_pool_base; - - template - friend struct execpools::operation; - - void enqueue(execpools::task_base* task, std::uint32_t tid = 0) noexcept - { - executor_.silent_async([task, tid] { task->execute_(task, /*tid=*/tid); }); - } + using taskflow_thread_pool [[deprecated( + "execpools::taskflow_thread_pool has been renamed to " + "exec::taskflow::taskflow_thread_pool instead.")]] = exec::taskflow::taskflow_thread_pool; - tf::Executor executor_; - }; } // namespace execpools diff --git a/include/execpools/tbb/tbb_thread_pool.hpp b/include/execpools/tbb/tbb_thread_pool.hpp index 85a8f22bc..40037e9eb 100644 --- a/include/execpools/tbb/tbb_thread_pool.hpp +++ b/include/execpools/tbb/tbb_thread_pool.hpp @@ -1,7 +1,5 @@ /* - * Copyright (c) 2023 Ben FrantzDale - * Copyright (c) 2021-2023 Facebook, Inc. and its affiliates. - * Copyright (c) 2021-2024 NVIDIA Corporation + * Copyright (c) 2026 NVIDIA Corporation * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with @@ -15,50 +13,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #pragma once -#include +#include "../../stdexec/__detail/__config.hpp" + +#if STDEXEC_MSVC() +# pragma message( \ + "WARNING: The header is deprecated. Please include instead.") +#else +# warning \ + "The header is deprecated. Please include instead." +#endif -#include -#include +#include "../../exec/tbb/tbb_thread_pool.hpp" // IWYU pragma: export namespace execpools { - class tbb_thread_pool : public thread_pool_base - { - public: - //! Constructor forwards to tbb::task_arena constructor: - template - requires STDEXEC::__std::constructible_from - explicit tbb_thread_pool(Args&&... args) - : arena_{std::forward(args)...} - { - arena_.initialize(); - } - - [[nodiscard]] - auto available_parallelism() const -> std::uint32_t - { - return static_cast(arena_.max_concurrency()); - } - private: - [[nodiscard]] - static constexpr auto forward_progress_guarantee() -> STDEXEC::forward_progress_guarantee - { - return STDEXEC::forward_progress_guarantee::parallel; - } - - friend thread_pool_base; - - template - friend struct operation; - - void enqueue(task_base* task, std::uint32_t tid = 0) noexcept - { - arena_.enqueue([task, tid] { task->execute_(task, /*tid=*/tid); }); - } + using tbb_thread_pool + [[deprecated("execpools::tbb_thread_pool has been renamed to " + "exec::tbb::tbb_thread_pool instead.")]] = exec::tbb::tbb_thread_pool; - tbb::task_arena arena_{tbb::task_arena::attach{}}; - }; } // namespace execpools diff --git a/include/execpools/thread_pool_base.hpp b/include/execpools/thread_pool_base.hpp index 4f24e7ab8..1aa349b4a 100644 --- a/include/execpools/thread_pool_base.hpp +++ b/include/execpools/thread_pool_base.hpp @@ -17,540 +17,22 @@ */ #pragma once -#include +#include "../stdexec/__detail/__config.hpp" -namespace execpools -{ - struct CANNOT_DISPATCH_BULK_ALGORITHM_TO_THE_POOL_SCHEDULER; - struct BECAUSE_THERE_IS_NO_POOL_SCHEDULER_IN_THE_ENVIRONMENT; - struct ADD_A_CONTINUES_ON_TRANSITION_TO_THE_POOL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM; - - //! This is a P2300-style thread pool wrapping base class, which its docs describe as "A class that represents an - //! explicit, user-managed task scheduler arena." - //! Once set up, a task arena and it has - //! * template void enqueue(F &&f) - //! and - //! * template auto execute(F &&f) -> decltype(f()) - //! - template - struct operation; - - using task_base = exec::static_thread_pool::task_base; - - template // CRTP - class thread_pool_base - { - template - friend struct operation; - - public: - struct scheduler; - - struct domain : STDEXEC::default_domain - { - template Sender, class Env> - static constexpr auto transform_sender(STDEXEC::set_value_t, Sender&& sndr, Env const & env) - { - auto& [tag, data, child] = sndr; - auto& [pol, shape, fun] = data; - - if constexpr (STDEXEC::__completes_on) - { - auto sch = - STDEXEC::get_completion_scheduler(STDEXEC::get_env(child), env); - using sender_t = - scheduler::template bulk_sender_t; - return sender_t{*sch.pool_, - STDEXEC::__forward_like(child), - shape, - STDEXEC::__forward_like(fun)}; - } - else - { - return STDEXEC::__not_a_sender< - STDEXEC::_WHAT_(CANNOT_DISPATCH_BULK_ALGORITHM_TO_THE_POOL_SCHEDULER), - STDEXEC::_WHY_(BECAUSE_THERE_IS_NO_POOL_SCHEDULER_IN_THE_ENVIRONMENT), - STDEXEC::_WHERE_(STDEXEC::_IN_ALGORITHM_, STDEXEC::tag_of_t), - STDEXEC::_TO_FIX_THIS_ERROR_( - ADD_A_CONTINUES_ON_TRANSITION_TO_THE_POOL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM), - STDEXEC::_WITH_PRETTY_SENDER_, - STDEXEC::_WITH_ENVIRONMENT_(Env)>(); - } - } - - template Sender, class Env> - static constexpr auto transform_sender(STDEXEC::set_value_t, Sender&& sndr, Env const & env); - }; - - struct scheduler - { - private: - template - friend struct operation; - - class sender - { - public: - using sender_concept = STDEXEC::sender_t; - template - static consteval auto get_completion_signatures() noexcept - { - if constexpr (STDEXEC::unstoppable_token>) - { - return STDEXEC::completion_signatures(); - } - else - { - return STDEXEC::completion_signatures(); - } - } - - template - auto - query(STDEXEC::get_completion_scheduler_t) const noexcept -> DerivedPoolType::scheduler - { - return pool_.get_scheduler(); - } - - template - auto query(STDEXEC::get_completion_domain_t) const noexcept -> domain - { - return {}; - } - - auto get_env() const noexcept -> sender const & - { - return *this; - } - - template - auto connect(Receiver rcvr) const -> operation - { - return operation{this->pool_, static_cast(rcvr)}; - } - - private: - friend struct DerivedPoolType::scheduler; - - explicit sender(DerivedPoolType& pool) noexcept - : pool_(pool) - {} - - DerivedPoolType& pool_; - }; - - template - using bulk_non_throwing = STDEXEC::__mbool< - // If function invocation doesn't throw - STDEXEC::__nothrow_callable && - // and emplacing a tuple doesn't throw - noexcept(STDEXEC::__decayed_std_tuple(std::declval()...)) - // there's no need to advertise completion with `exception_ptr` - >; - - template - struct bulk_shared_state : task_base - { - using variant_t = STDEXEC::__value_types_of_t, - STDEXEC::__q, - STDEXEC::__q>; - - variant_t data_; - DerivedPoolType& pool_; - Receiver rcvr_; - Shape shape_; - Fun fun_; - - std::atomic finished_threads_{0}; - std::atomic thread_with_exception_{0}; - std::exception_ptr exception_; - - [[nodiscard]] - auto num_agents_required() const -> std::uint32_t - { - // With work stealing, is std::min necessary, or can we feel free to ask for more agents (tasks) - // than we can actually deal with at one time? - return static_cast( - (std::min) (shape_, static_cast(pool_.available_parallelism()))); - } - - template - void apply(F f) - { - std::visit([&](auto& tupl) -> void - { std::apply([&](auto&... args) -> void { f(args...); }, tupl); }, - data_); - } - - bulk_shared_state(DerivedPoolType& pool, Receiver rcvr, Shape shape, Fun fun) - : pool_(pool) - , rcvr_{static_cast(rcvr)} - , shape_{shape} - , fun_{fun} - , thread_with_exception_{num_agents_required()} - { - this->execute_ = [](task_base* t, std::uint32_t tid) noexcept - { - auto& self = *static_cast(t); - auto total_threads = self.num_agents_required(); - - auto computation = [&](auto&... args) - { - auto [begin, end] = exec::_pool_::even_share(self.shape_, tid, total_threads); - self.fun_(begin, end, args...); - }; - - auto completion = [&](auto&... args) - { - STDEXEC::set_value(static_cast(self.rcvr_), std::move(args)...); - }; - - if constexpr (MayThrow) - { - STDEXEC_TRY - { - self.apply(computation); - } - STDEXEC_CATCH_ALL - { - std::uint32_t expected = total_threads; - - if (self.thread_with_exception_.compare_exchange_strong(expected, - tid, - std::memory_order_relaxed, - std::memory_order_relaxed)) - { - self.exception_ = std::current_exception(); - } - } - - bool const is_last_thread = self.finished_threads_.fetch_add(1) - == (total_threads - 1); - - if (is_last_thread) - { - if (self.exception_) - { - STDEXEC::set_error(static_cast(self.rcvr_), - std::move(self.exception_)); - } - else - { - self.apply(completion); - } - } - } - else - { - self.apply(computation); - - bool const is_last_thread = self.finished_threads_.fetch_add(1) - == (total_threads - 1); - - if (is_last_thread) - { - self.apply(completion); - } - } - }; - } - }; - - template - struct bulk_receiver - { - using receiver_concept = STDEXEC::receiver_t; - - using shared_state = bulk_shared_state; - - void enqueue() noexcept - { - shared_state_.pool_.bulk_enqueue(&shared_state_, shared_state_.num_agents_required()); - } - - template - void set_value(As&&... as) noexcept - { - using tuple_t = STDEXEC::__decayed_std_tuple; - - shared_state& state = shared_state_; - - if constexpr (MayThrow) - { - STDEXEC_TRY - { - state.data_.template emplace(static_cast(as)...); - } - STDEXEC_CATCH_ALL - { - STDEXEC::set_error(std::move(state.rcvr_), std::current_exception()); - } - } - else - { - state.data_.template emplace(static_cast(as)...); - } - - if (state.shape_) - { - enqueue(); - } - else - { - state.apply([&](auto&... args) - { STDEXEC::set_value(std::move(state.rcvr_), std::move(args)...); }); - } - } - - template - void set_error(Error&& err) noexcept - { - shared_state& state = shared_state_; - STDEXEC::set_error(static_cast(state.rcvr_), static_cast(err)); - } +#if STDEXEC_MSVC() +# pragma message( \ + "WARNING: The header is deprecated. Please include instead.") +#else +# warning \ + "The header is deprecated. Please include instead." +#endif - void set_stopped() noexcept - { - shared_state& state = shared_state_; - STDEXEC::set_stopped(static_cast(state.rcvr_)); - } +#include "../exec/thread_pool_base.hpp" - auto get_env() const noexcept -> STDEXEC::env_of_t - { - return STDEXEC::get_env(shared_state_.rcvr_); - } - - shared_state& shared_state_; - }; - - template - struct bulk_op_state - { - static constexpr bool may_throw = - !STDEXEC::__value_types_of_t, - STDEXEC::__mbind_front_q, - STDEXEC::__q>::value; - - using bulk_rcvr = bulk_receiver; - using shared_state = bulk_shared_state; - using inner_op_state = STDEXEC::connect_result_t; - - shared_state shared_state_; - - inner_op_state inner_op_; - - void start() & noexcept - { - STDEXEC::start(inner_op_); - } - - bulk_op_state(DerivedPoolType& pool, Shape shape, Fun fun, CvSender&& sndr, Receiver rcvr) - : shared_state_(pool, static_cast(rcvr), shape, fun) - , inner_op_{STDEXEC::connect(static_cast(sndr), bulk_rcvr{shared_state_})} - {} - }; - - template - using __decay_ref = STDEXEC::__decay_t<_Ty>&; - - template - struct bulk_sender - { - using sender_concept = STDEXEC::sender_t; - - template - using _with_error_invoke_t = STDEXEC::__eptr_completion_unless_t, Env...>, - STDEXEC::__mtransform, - STDEXEC::__mbind_front_q>, - STDEXEC::__q>>; - - template - using _set_value_t = - STDEXEC::completion_signatures...)>; - - template - using _completion_signatures_t = STDEXEC::transform_completion_signatures< - STDEXEC::__completion_signatures_of_t, Env...>, - _with_error_invoke_t, - _set_value_t>; - - template - using bulk_op_state_t = - bulk_op_state, Receiver, Shape, Fun>; - - template Self, STDEXEC::receiver Receiver> - requires STDEXEC::receiver_of>> - STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Receiver rcvr) - noexcept(STDEXEC::__nothrow_constructible_from, - DerivedPoolType&, - Shape, - Fun, - Sender, - Receiver>) - -> bulk_op_state_t - { - return bulk_op_state_t{self.pool_, - self.shape_, - static_cast(self).fun_, - static_cast(self).sndr_, - static_cast(rcvr)}; - } - STDEXEC_EXPLICIT_THIS_END(connect) - - template Self, class... Env> - static consteval auto get_completion_signatures() // - -> _completion_signatures_t - { - return {}; - } - - struct attrs - { - template - requires STDEXEC::__queryable_with, Tag, As...> - auto query(Tag, As&&... as) const - noexcept(STDEXEC::__nothrow_queryable_with, Tag, As...>) - -> decltype(auto) - { - return STDEXEC::__query()(STDEXEC::get_env(sndr_.sndr_), static_cast(as)...); - } - - bulk_sender const & sndr_; - }; - - [[nodiscard]] - auto get_env() const noexcept -> attrs - { - return {*this}; - } - - DerivedPoolType& pool_; - Sender sndr_; - Shape shape_; - Fun fun_; - }; - - template - using bulk_sender_t = bulk_sender, Shape, Fun>; - - friend thread_pool_base; - - explicit scheduler(DerivedPoolType& pool) noexcept - : pool_(&pool) - {} - - DerivedPoolType* pool_; - - public: - auto operator==(scheduler const &) const -> bool = default; - - [[nodiscard]] - constexpr auto query(STDEXEC::get_forward_progress_guarantee_t) const noexcept - -> STDEXEC::forward_progress_guarantee - { - return pool_->forward_progress_guarantee(); - } - - template Tag> - [[nodiscard]] - constexpr auto query(STDEXEC::get_completion_scheduler_t) const noexcept -> scheduler - { - return *this; - } - - template Tag> - [[nodiscard]] - constexpr auto query(STDEXEC::get_completion_domain_t) const noexcept -> domain - { - return {}; - } - - template Tag> - [[nodiscard]] - constexpr auto query(STDEXEC::get_completion_behavior_t) const noexcept - { - return STDEXEC::completion_behavior::asynchronous; - } - - [[nodiscard]] - auto schedule() const noexcept -> sender - { - return sender{*pool_}; - } - }; - - [[nodiscard]] - auto get_scheduler() noexcept -> scheduler - { - return scheduler{static_cast(*this)}; - } - - [[nodiscard]] - auto available_parallelism() const -> std::uint32_t - { - return static_cast(*this).available_parallelism(); - } - - private: - void enqueue(task_base* task, std::uint32_t tid = 0) noexcept - { - static_cast(*this).enqueue(task, tid); - } - - void bulk_enqueue(task_base* task, std::uint32_t n_threads) noexcept - { - for (std::uint32_t tid = 0; tid < n_threads; ++tid) - { - this->enqueue(task, tid); - } - } - }; - - template - struct operation : task_base - { - friend class thread_pool_base; - - PoolType& pool_; - Receiver rcvr_; - - explicit operation(PoolType& pool, Receiver rcvr) - : pool_(pool) - , rcvr_(std::move(rcvr)) - { - this->execute_ = [](task_base* t, std::uint32_t /* tid What is this needed for? */) noexcept - { - auto& op = *static_cast(t); - auto stoken = STDEXEC::get_stop_token(STDEXEC::get_env(op.rcvr_)); - // NOLINTNEXTLINE(bugprone-branch-clone) - if constexpr (STDEXEC::unstoppable_token) - { - STDEXEC::set_value(static_cast(op.rcvr_)); - } - else if (stoken.stop_requested()) - { - STDEXEC::set_stopped(static_cast(op.rcvr_)); - } - else - { - STDEXEC::set_value(static_cast(op.rcvr_)); - } - }; - } - - void enqueue() noexcept - { - pool_.enqueue(this); - } - - void start() & noexcept - { - enqueue(); - } - }; +namespace execpools +{ + template + using thread_pool_base + [[deprecated("execpools::thread_pool_base has been renamed to " + "exec::thread_pool_base")]] = exec::thread_pool_base; } // namespace execpools diff --git a/include/tbbexec/tbb_thread_pool.hpp b/include/tbbexec/tbb_thread_pool.hpp index c8cacf328..5726a13ed 100644 --- a/include/tbbexec/tbb_thread_pool.hpp +++ b/include/tbbexec/tbb_thread_pool.hpp @@ -16,17 +16,18 @@ */ #pragma once -#include +#include #if STDEXEC_MSVC() # pragma message( \ - "WARNING: Deprecated header file, please include the header file instead and use the execpools::tbb_thread_pool class that is identical as tbbexec::thread_pool class.") + "WARNING: Deprecated header file, please include the header file instead and use the exec::tbb::tbb_thread_pool class that is identical as tbbexec::thread_pool class.") #else # warning \ - "Deprecated header file, please include the header file instead and use the execpools::tbb_thread_pool class that is identical as tbbexec::thread_pool class." + "Deprecated header file, please include the header file instead and use the exec::tbb::tbb_thread_pool class that is identical as tbbexec::thread_pool class." #endif namespace tbbexec { - using tbb_thread_pool = execpools::tbb_thread_pool; + using tbb_thread_pool + [[deprecated("Please use exec::tbb::tbb_thread_pool instead")]] = exec::tbb::tbb_thread_pool; } // namespace tbbexec diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index cc23ea401..58e07dd84 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -111,7 +111,7 @@ add_executable(test.scratch test_main.cpp test_scratch.cpp) target_link_libraries(test.scratch PUBLIC STDEXEC::stdexec - $ + $ stdexec_executable_flags Catch2::Catch2 PRIVATE diff --git a/test/exec/CMakeLists.txt b/test/exec/CMakeLists.txt index dd7609f09..499c3fc33 100644 --- a/test/exec/CMakeLists.txt +++ b/test/exec/CMakeLists.txt @@ -60,9 +60,6 @@ set(exec_test_sources # These tests cause Microsoft's compiler to run out of memory $<$>:sequence/test_merge_each.cpp> $<$>:sequence/test_merge_each_threaded.cpp> - $<$:../execpools/test_tbb_thread_pool.cpp> - $<$:../execpools/test_taskflow_thread_pool.cpp> - $<$:../execpools/test_asio_thread_pool.cpp> $<$:test_libdispatch.cpp> test_unless_stop_requested.cpp ) @@ -71,9 +68,6 @@ add_executable(test.exec ${exec_test_sources}) target_link_libraries(test.exec PUBLIC STDEXEC::stdexec - $ - $ - $ stdexec_executable_flags Catch2::Catch2 PRIVATE @@ -86,6 +80,14 @@ if(STDEXEC_ENABLE_ASIO) add_subdirectory(asio) endif() +if(STDEXEC_ENABLE_TASKFLOW) + add_subdirectory(taskflow) +endif() + +if(STDEXEC_ENABLE_TBB) + add_subdirectory(tbb) +endif() + # Test which parses a source file for an expected error message icm_add_build_failure_test( NAME test_repeat_until_fail diff --git a/test/exec/asio/CMakeLists.txt b/test/exec/asio/CMakeLists.txt index 6bc7e72af..f7d11a0e0 100644 --- a/test/exec/asio/CMakeLists.txt +++ b/test/exec/asio/CMakeLists.txt @@ -16,6 +16,7 @@ set(asioexec_test_sources ../../test_main.cpp + test_asio_thread_pool.cpp test_completion_token.cpp test_use_sender.cpp ) @@ -24,8 +25,7 @@ add_executable(test.asioexec ${asioexec_test_sources}) target_link_libraries(test.asioexec PUBLIC STDEXEC::stdexec - $ - $ + $ stdexec_executable_flags Catch2::Catch2 PRIVATE diff --git a/test/execpools/test_asio_thread_pool.cpp b/test/exec/asio/test_asio_thread_pool.cpp similarity index 92% rename from test/execpools/test_asio_thread_pool.cpp rename to test/exec/asio/test_asio_thread_pool.cpp index 143f291ac..d65249d01 100644 --- a/test/execpools/test_asio_thread_pool.cpp +++ b/test/exec/asio/test_asio_thread_pool.cpp @@ -25,7 +25,7 @@ #include #include -#include +#include #include @@ -79,19 +79,19 @@ namespace { | then([=](std::vector&&) { return output; }); } - TEST_CASE( - "execpools::asio_thread_pool offers the parallel forward progress guarantee", - "[asio_thread_pool]") { - execpools::asio_thread_pool pool; + TEST_CASE("exec::asio::asio_thread_pool offers the parallel forward progress guarantee", + "[asio_thread_pool]") + { + exec::asio::asio_thread_pool pool; auto pool_sched = pool.get_scheduler(); CHECK( ex::get_forward_progress_guarantee(pool_sched) == ex::forward_progress_guarantee::parallel); } - TEST_CASE( - "ex::on works when changing threads with execpools::asio_thread_pool", - "[asio_thread_pool]") { - execpools::asio_thread_pool pool; + TEST_CASE("ex::on works when changing threads with exec::asio::asio_thread_pool", + "[asio_thread_pool]") + { + exec::asio::asio_thread_pool pool; auto pool_sched = pool.get_scheduler(); bool called{false}; // launch some work on the thread pool @@ -104,7 +104,7 @@ namespace { TEST_CASE("more asio_thread_pool", "[asio_thread_pool]") { using namespace STDEXEC; - execpools::asio_thread_pool pool(1ul); + exec::asio::asio_thread_pool pool(1ul); exec::static_thread_pool other_pool(1); STDEXEC::inline_scheduler inline_sched; @@ -136,7 +136,7 @@ namespace { TEST_CASE("asio_thread_pool exceptions", "[asio_thread_pool]") { using namespace STDEXEC; - execpools::asio_thread_pool taskflow_pool; + exec::asio::asio_thread_pool taskflow_pool; exec::static_thread_pool other_pool(1ul); { CHECK_THROWS(ex::sync_wait(starts_on(taskflow_pool.get_scheduler(), just(0)) | then([](auto) { @@ -160,7 +160,7 @@ namespace { TEST_CASE("asio_thread_pool async_inclusive_scan", "[asio_thread_pool]") { const auto input = std::array{1.0, 2.0, -1.0, -2.0}; std::remove_const_t output; - execpools::asio_thread_pool pool{2ul}; + exec::asio::asio_thread_pool pool{2ul}; auto [value] = ex::sync_wait(async_inclusive_scan(pool.get_scheduler(), input, output, 0.0, 4)) .value(); STATIC_REQUIRE(std::is_same_v>); @@ -171,7 +171,7 @@ namespace { TEST_CASE("asiothreadpool with exec::asio interoperability", "[asio_thread_pool]") { const auto current_thread_id = std::this_thread::get_id(); - execpools::asio_thread_pool pool{1ul}; + exec::asio::asio_thread_pool pool{1ul}; exec::asio::asio_impl::system_timer timer{pool.get_executor()}; const auto [other_thread_id] = ex::sync_wait( timer.async_wait(exec::asio::use_sender) diff --git a/test/exec/asio/test_completion_token.cpp b/test/exec/asio/test_completion_token.cpp index 2ec30b4c9..61c95d947 100644 --- a/test/exec/asio/test_completion_token.cpp +++ b/test/exec/asio/test_completion_token.cpp @@ -16,25 +16,23 @@ * limitations under the License. */ +#include #include -#include #include -#include + +#include #include #include #include -#include #include #include #include -#include #include #include #include #include #include -#include #include using namespace STDEXEC; @@ -535,9 +533,11 @@ namespace { "[asioexec][completion_token]") { std::exception_ptr ex; asio_impl::io_context ctx; - const auto initiating_function = [&](auto&& token) { + auto const initiating_function = [&](auto&& token) + { return asio_impl::async_initiate( - [&](auto&& h) { + [&](auto h) + { asio_impl::post(ctx.get_executor(), [h = std::move(h)]() noexcept { }); throw std::logic_error("Test"); }, diff --git a/test/exec/asio/test_use_sender.cpp b/test/exec/asio/test_use_sender.cpp index 49d28c1a0..71ead227d 100644 --- a/test/exec/asio/test_use_sender.cpp +++ b/test/exec/asio/test_use_sender.cpp @@ -16,17 +16,19 @@ * limitations under the License. */ +#include #include +#include +#include + #include + #include #include -#include #include #include #include -#include -#include #include #include diff --git a/test/exec/taskflow/CMakeLists.txt b/test/exec/taskflow/CMakeLists.txt new file mode 100644 index 000000000..6727d2a90 --- /dev/null +++ b/test/exec/taskflow/CMakeLists.txt @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2025 Robert Leahy. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://llvm.org/LICENSE.txt +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(taskflowexec_test_sources + ../../test_main.cpp + test_taskflow_thread_pool.cpp + ) + +add_executable(test.taskflowexec ${taskflowexec_test_sources}) +target_link_libraries(test.taskflowexec + PUBLIC + STDEXEC::stdexec + $ + stdexec_executable_flags + Catch2::Catch2 + PRIVATE + common_test_settings) + +catch_discover_tests(test.taskflowexec) diff --git a/test/execpools/test_taskflow_thread_pool.cpp b/test/exec/taskflow/test_taskflow_thread_pool.cpp similarity index 90% rename from test/execpools/test_taskflow_thread_pool.cpp rename to test/exec/taskflow/test_taskflow_thread_pool.cpp index 128686f16..8d1eb384b 100644 --- a/test/execpools/test_taskflow_thread_pool.cpp +++ b/test/exec/taskflow/test_taskflow_thread_pool.cpp @@ -22,7 +22,7 @@ #include #include -#include +#include namespace ex = STDEXEC; @@ -73,19 +73,19 @@ namespace { | then([=](std::vector&&) { return output; }); } - TEST_CASE( - "execpools::taskflow_thread_pool offers the parallel forward progress guarantee", - "[taskflow_thread_pool]") { - execpools::taskflow_thread_pool pool; + TEST_CASE("exec::taskflow::taskflow_thread_pool offers the parallel forward progress guarantee", + "[taskflow_thread_pool]") + { + exec::taskflow::taskflow_thread_pool pool; auto pool_sched = pool.get_scheduler(); CHECK( ex::get_forward_progress_guarantee(pool_sched) == ex::forward_progress_guarantee::parallel); } - TEST_CASE( - "STDEXEC::on works when changing threads with execpools::taskflow_thread_pool", - "[taskflow_thread_pool]") { - execpools::taskflow_thread_pool pool; + TEST_CASE("STDEXEC::on works when changing threads with exec::taskflow::taskflow_thread_pool", + "[taskflow_thread_pool]") + { + exec::taskflow::taskflow_thread_pool pool; auto pool_sched = pool.get_scheduler(); bool called{false}; // launch some work on the thread pool @@ -98,7 +98,7 @@ namespace { TEST_CASE("more taskflow_thread_pool", "[taskflow_thread_pool]") { using namespace STDEXEC; - execpools::taskflow_thread_pool pool(1ul); + exec::taskflow::taskflow_thread_pool pool(1ul); exec::static_thread_pool other_pool(1); STDEXEC::inline_scheduler inline_sched; @@ -130,7 +130,7 @@ namespace { TEST_CASE("taskflow_thread_pool exceptions", "[taskflow_thread_pool]") { using namespace STDEXEC; - execpools::taskflow_thread_pool taskflow_pool; + exec::taskflow::taskflow_thread_pool taskflow_pool; exec::static_thread_pool other_pool(1ul); { CHECK_THROWS( @@ -156,7 +156,7 @@ namespace { TEST_CASE("taskflow_thread_pool async_inclusive_scan", "[taskflow_thread_pool]") { const auto input = std::array{1.0, 2.0, -1.0, -2.0}; std::remove_const_t output; - execpools::taskflow_thread_pool pool{2ul}; + exec::taskflow::taskflow_thread_pool pool{2ul}; auto [value] = STDEXEC::sync_wait( async_inclusive_scan(pool.get_scheduler(), input, output, 0.0, 4)) .value(); diff --git a/test/exec/tbb/CMakeLists.txt b/test/exec/tbb/CMakeLists.txt new file mode 100644 index 000000000..f0aba9347 --- /dev/null +++ b/test/exec/tbb/CMakeLists.txt @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2025 Robert Leahy. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://llvm.org/LICENSE.txt +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(tbbexec_test_sources + ../../test_main.cpp + test_tbb_thread_pool.cpp + ) + +add_executable(test.tbbexec ${tbbexec_test_sources}) +target_link_libraries(test.tbbexec + PUBLIC + STDEXEC::stdexec + $ + stdexec_executable_flags + Catch2::Catch2 + PRIVATE + common_test_settings) + +catch_discover_tests(test.tbbexec) diff --git a/test/execpools/test_tbb_thread_pool.cpp b/test/exec/tbb/test_tbb_thread_pool.cpp similarity index 92% rename from test/execpools/test_tbb_thread_pool.cpp rename to test/exec/tbb/test_tbb_thread_pool.cpp index 3d8d4b9d2..799cc2ff5 100644 --- a/test/execpools/test_tbb_thread_pool.cpp +++ b/test/exec/tbb/test_tbb_thread_pool.cpp @@ -24,7 +24,7 @@ #include #include -#include +#include namespace ex = STDEXEC; @@ -75,19 +75,19 @@ namespace { | then([=](std::vector&&) { return output; }); } - TEST_CASE( - "execpools::tbb_thread_pool offers the parallel forward progress guarantee", - "[tbb_thread_pool]") { - execpools::tbb_thread_pool pool; + TEST_CASE("exec::tbb::tbb_thread_pool offers the parallel forward progress guarantee", + "[tbb_thread_pool]") + { + exec::tbb::tbb_thread_pool pool; auto pool_sched = pool.get_scheduler(); CHECK( ex::get_forward_progress_guarantee(pool_sched) == ex::forward_progress_guarantee::parallel); } - TEST_CASE( - "ex::on works when changing threads with execpools::tbb_thread_pool", - "[tbb_thread_pool]") { - execpools::tbb_thread_pool pool; + TEST_CASE("ex::on works when changing threads with exec::tbb::tbb_thread_pool", + "[tbb_thread_pool]") + { + exec::tbb::tbb_thread_pool pool; auto pool_sched = pool.get_scheduler(); bool called{false}; // launch some work on the thread pool @@ -100,7 +100,7 @@ namespace { TEST_CASE("more tbb_thread_pool", "[tbb_thread_pool]") { using namespace STDEXEC; - execpools::tbb_thread_pool pool(1); + exec::tbb::tbb_thread_pool pool(1); exec::static_thread_pool other_pool(1); STDEXEC::inline_scheduler inline_sched; @@ -135,7 +135,7 @@ namespace { // state. We'd better have it act normally here. using namespace STDEXEC; - execpools::tbb_thread_pool tbb_pool; + exec::tbb::tbb_thread_pool tbb_pool; exec::static_thread_pool other_pool(1); { CHECK_THROWS(ex::sync_wait(starts_on(tbb_pool.get_scheduler(), just(0)) | then([](auto) { @@ -160,7 +160,7 @@ namespace { TEST_CASE("tbb_thread_pool async_inclusive_scan", "[tbb_thread_pool]") { const auto input = std::array{1.0, 2.0, -1.0, -2.0}; std::remove_const_t output; - execpools::tbb_thread_pool pool{2}; + exec::tbb::tbb_thread_pool pool{2}; auto [value] = ex::sync_wait(async_inclusive_scan(pool.get_scheduler(), input, output, 0.0, 4)) .value(); STATIC_REQUIRE(std::is_same_v>);