Update thread_pool to 3.3

Fixes an issue with `wait_for_tasks()` and adds a lower-overhead
`push_loop` helper.  We replace our usage of `parallelize_loop` with
`push_loop` as we didn't use the multi-future vector return and don't
need the extra overhead.
This commit is contained in:
Seth Hillbrand 2022-12-02 12:01:34 -08:00
parent 50d34e232b
commit f9a36b9c91
3 changed files with 362 additions and 99 deletions

View File

@ -763,12 +763,15 @@ void CONNECTION_GRAPH::updateItemConnectivity( const SCH_SHEET_PATH& aSheet,
return 1; return 1;
}; };
GetKiCadThreadPool().parallelize_loop( 0, connection_vec.size(), thread_pool& tp = GetKiCadThreadPool();
tp.push_loop( connection_vec.size(),
[&]( const int a, const int b) [&]( const int a, const int b)
{ {
for( int ii = a; ii < b; ++ii ) for( int ii = a; ii < b; ++ii )
update_lambda( connection_vec[ii] ); update_lambda( connection_vec[ii] );
}).wait(); });
tp.wait_for_tasks();
} }
} }
@ -915,12 +918,15 @@ void CONNECTION_GRAPH::resolveAllDrivers()
return 1; return 1;
}; };
GetKiCadThreadPool().parallelize_loop( 0, dirty_graphs.size(), thread_pool& tp = GetKiCadThreadPool();
tp.push_loop( dirty_graphs.size(),
[&]( const int a, const int b) [&]( const int a, const int b)
{ {
for( int ii = a; ii < b; ++ii ) for( int ii = a; ii < b; ++ii )
update_lambda( dirty_graphs[ii] ); update_lambda( dirty_graphs[ii] );
}).wait(); });
tp.wait_for_tasks();
// Now discard any non-driven subgraphs from further consideration // Now discard any non-driven subgraphs from further consideration
@ -1463,12 +1469,15 @@ void CONNECTION_GRAPH::buildConnectionGraph( std::function<void( SCH_ITEM* )>* a
for( CONNECTION_SUBGRAPH* subgraph : m_driver_subgraphs ) for( CONNECTION_SUBGRAPH* subgraph : m_driver_subgraphs )
m_sheet_to_subgraphs_map[ subgraph->m_sheet ].emplace_back( subgraph ); m_sheet_to_subgraphs_map[ subgraph->m_sheet ].emplace_back( subgraph );
GetKiCadThreadPool().parallelize_loop( 0, m_driver_subgraphs.size(), thread_pool& tp = GetKiCadThreadPool();
tp.push_loop( m_driver_subgraphs.size(),
[&]( const int a, const int b) [&]( const int a, const int b)
{ {
for( int ii = a; ii < b; ++ii ) for( int ii = a; ii < b; ++ii )
m_driver_subgraphs[ii]->UpdateItemConnections(); m_driver_subgraphs[ii]->UpdateItemConnections();
}).wait(); });
tp.wait_for_tasks();
// Next time through the subgraphs, we do some post-processing to handle things like // Next time through the subgraphs, we do some post-processing to handle things like
// connecting bus members to their neighboring subgraphs, and then propagate connections // connecting bus members to their neighboring subgraphs, and then propagate connections
@ -1656,12 +1665,13 @@ void CONNECTION_GRAPH::buildConnectionGraph( std::function<void( SCH_ITEM* )>* a
return 1; return 1;
}; };
GetKiCadThreadPool().parallelize_loop( 0, m_driver_subgraphs.size(), tp.push_loop( m_driver_subgraphs.size(),
[&]( const int a, const int b) [&]( const int a, const int b)
{ {
for( int ii = a; ii < b; ++ii ) for( int ii = a; ii < b; ++ii )
updateItemConnectionsTask( m_driver_subgraphs[ii] ); updateItemConnectionsTask( m_driver_subgraphs[ii] );
}).wait(); });
tp.wait_for_tasks();
m_net_code_to_subgraphs_map.clear(); m_net_code_to_subgraphs_map.clear();
m_net_name_to_subgraphs_map.clear(); m_net_name_to_subgraphs_map.clear();

View File

@ -177,12 +177,15 @@ void CONNECTIVITY_DATA::updateRatsnest()
return aNet->IsDirty() && aNet->GetNodeCount() > 0; return aNet->IsDirty() && aNet->GetNodeCount() > 0;
} ); } );
GetKiCadThreadPool().parallelize_loop( 0, dirty_nets.size(), thread_pool& tp = GetKiCadThreadPool();
tp.push_loop( dirty_nets.size(),
[&]( const int a, const int b) [&]( const int a, const int b)
{ {
for( int ii = a; ii < b; ++ii ) for( int ii = a; ii < b; ++ii )
dirty_nets[ii]->UpdateNet(); dirty_nets[ii]->UpdateNet();
} ).wait(); } );
tp.wait_for_tasks();
#ifdef PROFILE #ifdef PROFILE
rnUpdate.Show(); rnUpdate.Show();
@ -347,12 +350,15 @@ void CONNECTIVITY_DATA::ComputeLocalRatsnest( const std::vector<BOARD_ITEM*>& aI
} }
}; };
GetKiCadThreadPool().parallelize_loop( 1, aDynamicData->m_nets.size(), thread_pool& tp = GetKiCadThreadPool();
tp.push_loop( 1, aDynamicData->m_nets.size(),
[&]( const int a, const int b) [&]( const int a, const int b)
{ {
for( int ii = a; ii < b; ++ii ) for( int ii = a; ii < b; ++ii )
update_lambda( ii ); update_lambda( ii );
}).wait(); });
tp.wait_for_tasks();
// This gets the ratsnest for internal connections in the moving set // This gets the ratsnest for internal connections in the moving set
const std::vector<CN_EDGE>& edges = GetRatsnestForItems( aItems ); const std::vector<CN_EDGE>& edges = GetRatsnestForItems( aItems );

View File

@ -3,32 +3,35 @@
/** /**
* @file BS_thread_pool.hpp * @file BS_thread_pool.hpp
* @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com)
* @version 3.0.0 * @version 3.3.0
* @date 2022-05-30 * @date 2022-08-03
* @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
* *
* @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS:synced_stream, and BS::timer. * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS::blocks, BS:synced_stream, and BS::timer.
*/ */
#define BS_THREAD_POOL_VERSION "v3.0.0 (2022-05-30)" #define BS_THREAD_POOL_VERSION "v3.3.0 (2022-08-03)"
#include <atomic> // std::atomic #include <atomic> // std::atomic
#include <chrono> // std::chrono #include <chrono> // std::chrono
#include <condition_variable> // std::condition_variable #include <condition_variable> // std::condition_variable
#include <exception> // std::current_exception #include <exception> // std::current_exception
#include <functional> // std::function #include <functional> // std::bind, std::function, std::invoke
#include <future> // std::future, std::promise #include <future> // std::future, std::promise
#include <iostream> // std::cout, std::ostream #include <iostream> // std::cout, std::endl, std::flush, std::ostream
#include <memory> // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr #include <memory> // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr
#include <mutex> // std::mutex, std::scoped_lock, std::unique_lock #include <mutex> // std::mutex, std::scoped_lock, std::unique_lock
#include <queue> // std::queue #include <queue> // std::queue
#include <thread> // std::thread #include <thread> // std::thread
#include <type_traits> // std::common_type_t, std::decay_t, std::is_void_v, std::invoke_result_t #include <type_traits> // std::common_type_t, std::conditional_t, std::decay_t, std::invoke_result_t, std::is_void_v
#include <utility> // std::move, std::swap #include <utility> // std::forward, std::move, std::swap
#include <vector> // std::vector #include <vector> // std::vector
namespace BS namespace BS
{ {
/**
* @brief A convenient shorthand for the type of std::thread::hardware_concurrency(). Should evaluate to unsigned int.
*/
using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>; using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;
// ============================================================================================= // // ============================================================================================= //
@ -36,9 +39,11 @@ using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concur
/** /**
* @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once. * @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once.
*
* @tparam T The return type of the futures.
*/ */
template <typename T> template <typename T>
class multi_future class [[nodiscard]] multi_future
{ {
public: public:
/** /**
@ -46,46 +51,193 @@ public:
* *
* @param num_futures_ The desired number of futures to store. * @param num_futures_ The desired number of futures to store.
*/ */
explicit multi_future(const size_t num_futures_ = 0) : f(num_futures_) {} multi_future(const size_t num_futures_ = 0) : futures(num_futures_) {}
/** /**
* @brief Get the results from all the futures stored in this multi_future object. * @brief Get the results from all the futures stored in this multi_future object, rethrowing any stored exceptions.
* *
* @return A vector containing the results. * @return If the futures return void, this function returns void as well. Otherwise, it returns a vector containing the results.
*/ */
std::vector<T> get() [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>> get()
{ {
std::vector<T> results(f.size()); if constexpr (std::is_void_v<T>)
for (size_t i = 0; i < f.size(); ++i) {
results[i] = f[i].get(); for (size_t i = 0; i < futures.size(); ++i)
futures[i].get();
return;
}
else
{
std::vector<T> results(futures.size());
for (size_t i = 0; i < futures.size(); ++i)
results[i] = futures[i].get();
return results; return results;
} }
}
/**
* @brief Get a reference to one of the futures stored in this multi_future object.
*
* @param i The index of the desired future.
* @return The future.
*/
[[nodiscard]] std::future<T>& operator[](const size_t i)
{
return futures[i];
}
/**
* @brief Append a future to this multi_future object.
*
* @param future The future to append.
*/
void push_back(std::future<T> future)
{
futures.push_back(std::move(future));
}
/**
* @brief Get the number of futures stored in this multi_future object.
*
* @return The number of futures.
*/
[[nodiscard]] size_t size() const
{
return futures.size();
}
/** /**
* @brief Wait for all the futures stored in this multi_future object. * @brief Wait for all the futures stored in this multi_future object.
*/ */
void wait() const void wait() const
{ {
for (size_t i = 0; i < f.size(); ++i) for (size_t i = 0; i < futures.size(); ++i)
f[i].wait(); futures[i].wait();
} }
private:
/** /**
* @brief A vector to store the futures. * @brief A vector to store the futures.
*/ */
std::vector<std::future<T>> f; std::vector<std::future<T>> futures;
}; };
// End class multi_future // // End class multi_future //
// ============================================================================================= // // ============================================================================================= //
// ============================================================================================= //
// Begin class blocks //
/**
* @brief A helper class to divide a range into blocks. Used by parallelize_loop() and push_loop().
*
* @tparam T1 The type of the first index in the range. Should be a signed or unsigned integer.
* @tparam T2 The type of the index after the last index in the range. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred.
* @tparam T The common type of T1 and T2.
*/
template <typename T1, typename T2, typename T = std::common_type_t<T1, T2>>
class [[nodiscard]] blocks
{
public:
/**
* @brief Construct a blocks object with the given specifications.
*
* @param first_index_ The first index in the range.
* @param index_after_last_ The index after the last index in the range.
* @param num_blocks_ The desired number of blocks to divide the range into.
*/
blocks(const T1 first_index_, const T2 index_after_last_, const size_t num_blocks_) : first_index(static_cast<T>(first_index_)), index_after_last(static_cast<T>(index_after_last_)), num_blocks(num_blocks_)
{
if (index_after_last < first_index)
std::swap(index_after_last, first_index);
total_size = static_cast<size_t>(index_after_last - first_index);
block_size = static_cast<size_t>(total_size / num_blocks);
if (block_size == 0)
{
block_size = 1;
num_blocks = (total_size > 1) ? total_size : 1;
}
}
/**
* @brief Get the first index of a block.
*
* @param i The block number.
* @return The first index.
*/
[[nodiscard]] T start(const size_t i) const
{
return static_cast<T>(i * block_size) + first_index;
}
/**
* @brief Get the index after the last index of a block.
*
* @param i The block number.
* @return The index after the last index.
*/
[[nodiscard]] T end(const size_t i) const
{
return (i == num_blocks - 1) ? index_after_last : (static_cast<T>((i + 1) * block_size) + first_index);
}
/**
* @brief Get the number of blocks. Note that this may be different than the desired number of blocks that was passed to the constructor.
*
* @return The number of blocks.
*/
[[nodiscard]] size_t get_num_blocks() const
{
return num_blocks;
}
/**
* @brief Get the total number of indices in the range.
*
* @return The total number of indices.
*/
[[nodiscard]] size_t get_total_size() const
{
return total_size;
}
private:
/**
* @brief The size of each block (except possibly the last block).
*/
size_t block_size = 0;
/**
* @brief The first index in the range.
*/
T first_index = 0;
/**
* @brief The index after the last index in the range.
*/
T index_after_last = 0;
/**
* @brief The number of blocks.
*/
size_t num_blocks = 0;
/**
* @brief The total number of indices in the range.
*/
size_t total_size = 0;
};
// End class blocks //
// ============================================================================================= //
// ============================================================================================= // // ============================================================================================= //
// Begin class thread_pool // // Begin class thread_pool //
/** /**
* @brief A fast, lightweight, and easy-to-use C++17 thread pool class. * @brief A fast, lightweight, and easy-to-use C++17 thread pool class.
*/ */
class thread_pool class [[nodiscard]] thread_pool
{ {
public: public:
// ============================ // ============================
@ -97,13 +249,13 @@ public:
* *
* @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads. * @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
*/ */
explicit thread_pool(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) : thread_count(thread_count_ ? thread_count_ : std::thread::hardware_concurrency()), threads(std::make_unique<std::thread[]>(thread_count_ ? thread_count_ : std::thread::hardware_concurrency())) thread_pool(const concurrency_t thread_count_ = 0) : thread_count(determine_thread_count(thread_count_)), threads(std::make_unique<std::thread[]>(determine_thread_count(thread_count_)))
{ {
create_threads(); create_threads();
} }
/** /**
* @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the variable paused is set to true, then any tasks still in the queue will never be executed. * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
*/ */
~thread_pool() ~thread_pool()
{ {
@ -120,7 +272,7 @@ public:
* *
* @return The number of queued tasks. * @return The number of queued tasks.
*/ */
size_t get_tasks_queued() const [[nodiscard]] size_t get_tasks_queued() const
{ {
const std::scoped_lock tasks_lock(tasks_mutex); const std::scoped_lock tasks_lock(tasks_mutex);
return tasks.size(); return tasks.size();
@ -131,7 +283,7 @@ public:
* *
* @return The number of running tasks. * @return The number of running tasks.
*/ */
size_t get_tasks_running() const [[nodiscard]] size_t get_tasks_running() const
{ {
const std::scoped_lock tasks_lock(tasks_mutex); const std::scoped_lock tasks_lock(tasks_mutex);
return tasks_total - tasks.size(); return tasks_total - tasks.size();
@ -142,7 +294,7 @@ public:
* *
* @return The total number of tasks. * @return The total number of tasks.
*/ */
size_t get_tasks_total() const [[nodiscard]] size_t get_tasks_total() const
{ {
return tasks_total; return tasks_total;
} }
@ -152,13 +304,23 @@ public:
* *
* @return The number of threads. * @return The number of threads.
*/ */
concurrency_t get_thread_count() const [[nodiscard]] concurrency_t get_thread_count() const
{ {
return thread_count; return thread_count;
} }
/** /**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. * @brief Check whether the pool is currently paused.
*
* @return true if the pool is paused, false if it is not paused.
*/
[[nodiscard]] bool is_paused() const
{
return paused;
}
/**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks.
* *
* @tparam F The type of the function to loop through. * @tparam F The type of the function to loop through.
* @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer. * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer.
@ -166,56 +328,106 @@ public:
* @tparam T The common type of T1 and T2. * @tparam T The common type of T1 and T2.
* @tparam R The return value of the loop function F (can be void). * @tparam R The return value of the loop function F (can be void).
* @param first_index The first index in the loop. * @param first_index The first index in the loop.
* @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if first_index == index_after_last, no blocks will be submitted. * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted.
* @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
* @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
* @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can be used to obtain the values returned by each block. * @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can also be used to obtain the values returned by each block.
*/ */
template <typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>, typename R = std::invoke_result_t<std::decay_t<F>, T, T>> template <typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
multi_future<R> parallelize_loop(const T1& first_index, const T2& index_after_last, const F& loop, size_t num_blocks = 0) [[nodiscard]] multi_future<R> parallelize_loop(const T1 first_index, const T2 index_after_last, F&& loop, const size_t num_blocks = 0)
{ {
T first_index_T = static_cast<T>(first_index); blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
T index_after_last_T = static_cast<T>(index_after_last); if (blks.get_total_size() > 0)
if (first_index_T == index_after_last_T)
return multi_future<R>();
if (index_after_last_T < first_index_T)
std::swap(index_after_last_T, first_index_T);
if (num_blocks == 0)
num_blocks = thread_count;
const size_t total_size = static_cast<size_t>(index_after_last_T - first_index_T);
size_t block_size = static_cast<size_t>(total_size / num_blocks);
if (block_size == 0)
{ {
block_size = 1; multi_future<R> mf(blks.get_num_blocks());
num_blocks = total_size > 1 ? total_size : 1; for (size_t i = 0; i < blks.get_num_blocks(); ++i)
} mf[i] = submit(std::forward<F>(loop), blks.start(i), blks.end(i));
multi_future<R> mf(num_blocks);
for (size_t i = 0; i < num_blocks; ++i)
{
const T start = (static_cast<T>(i * block_size) + first_index_T);
const T end = (i == num_blocks - 1) ? index_after_last_T : (static_cast<T>((i + 1) * block_size) + first_index_T);
mf.f[i] = submit(loop, start, end);
}
return mf; return mf;
} }
else
{
return multi_future<R>();
}
}
/** /**
* @brief Push a function with zero or more arguments, but no return value, into the task queue. * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks. This overload is used for the special case where the first index is 0.
*
* @tparam F The type of the function to loop through.
* @tparam T The type of the loop indices. Should be a signed or unsigned integer.
* @tparam R The return value of the loop function F (can be void).
* @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted.
* @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
* @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
* @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can also be used to obtain the values returned by each block.
*/
template <typename F, typename T, typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
[[nodiscard]] multi_future<R> parallelize_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0)
{
return parallelize_loop(0, index_after_last, std::forward<F>(loop), num_blocks);
}
/**
* @brief Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished.
*/
void pause()
{
paused = true;
}
/**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
*
* @tparam F The type of the function to loop through.
* @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer.
* @tparam T2 The type of the index after the last index in the loop. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred.
* @tparam T The common type of T1 and T2.
* @param first_index The first index in the loop.
* @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted.
* @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
* @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
*/
template <typename F, typename T1, typename T2, typename T = std::common_type_t<T1, T2>>
void push_loop(const T1 first_index, const T2 index_after_last, F&& loop, const size_t num_blocks = 0)
{
blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count);
if (blks.get_total_size() > 0)
{
for (size_t i = 0; i < blks.get_num_blocks(); ++i)
push_task(std::forward<F>(loop), blks.start(i), blks.end(i));
}
}
/**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. This overload is used for the special case where the first index is 0.
*
* @tparam F The type of the function to loop through.
* @tparam T The type of the loop indices. Should be a signed or unsigned integer.
* @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted.
* @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)".
* @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool.
*/
template <typename F, typename T>
void push_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0)
{
push_loop(0, index_after_last, std::forward<F>(loop), num_blocks);
}
/**
* @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
* *
* @tparam F The type of the function. * @tparam F The type of the function.
* @tparam A The types of the arguments. * @tparam A The types of the arguments.
* @param task The function to push. * @param task The function to push.
* @param args The arguments to pass to the function. * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments.
*/ */
template <typename F, typename... A> template <typename F, typename... A>
void push_task(const F& task, const A&... args) void push_task(F&& task, A&&... args)
{ {
std::function<void()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
{ {
const std::scoped_lock tasks_lock(tasks_mutex); const std::scoped_lock tasks_lock(tasks_mutex);
if constexpr (sizeof...(args) == 0) tasks.push(task_function);
tasks.push(std::function<void()>(task));
else
tasks.push(std::function<void()>([task, args...] { task(args...); }));
} }
++tasks_total; ++tasks_total;
task_available_cv.notify_one(); task_available_cv.notify_one();
@ -226,13 +438,13 @@ public:
* *
* @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads. * @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.
*/ */
void reset(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) void reset(const concurrency_t thread_count_ = 0)
{ {
const bool was_paused = paused; const bool was_paused = paused;
paused = true; paused = true;
wait_for_tasks(); wait_for_tasks();
destroy_threads(); destroy_threads();
thread_count = thread_count_ ? thread_count_ : std::thread::hardware_concurrency(); thread_count = determine_thread_count(thread_count_);
threads = std::make_unique<std::thread[]>(thread_count); threads = std::make_unique<std::thread[]>(thread_count);
paused = was_paused; paused = was_paused;
create_threads(); create_threads();
@ -245,26 +457,27 @@ public:
* @tparam A The types of the zero or more arguments to pass to the function. * @tparam A The types of the zero or more arguments to pass to the function.
* @tparam R The return type of the function (can be void). * @tparam R The return type of the function (can be void).
* @param task The function to submit. * @param task The function to submit.
* @param args The zero or more arguments to pass to the function. * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments.
* @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one. * @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one.
*/ */
template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>> template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
std::future<R> submit(const F& task, const A&... args) [[nodiscard]] std::future<R> submit(F&& task, A&&... args)
{ {
std::function<R()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>(); std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
push_task( push_task(
[task, args..., task_promise] [task_function, task_promise]
{ {
try try
{ {
if constexpr (std::is_void_v<R>) if constexpr (std::is_void_v<R>)
{ {
task(args...); std::invoke(task_function);
task_promise->set_value(); task_promise->set_value();
} }
else else
{ {
task_promise->set_value(task(args...)); task_promise->set_value(std::invoke(task_function));
} }
} }
catch (...) catch (...)
@ -281,6 +494,14 @@ public:
return task_promise->get_future(); return task_promise->get_future();
} }
/**
* @brief Unpause the pool. The workers will resume retrieving new tasks out of the queue.
*/
void unpause()
{
paused = false;
}
/** /**
* @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future. * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future.
*/ */
@ -292,15 +513,6 @@ public:
waiting = false; waiting = false;
} }
// ===========
// Public data
// ===========
/**
* @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. Set to false again to resume retrieving tasks.
*/
std::atomic<bool> paused = false;
private: private:
// ======================== // ========================
// Private member functions // Private member functions
@ -331,6 +543,25 @@ private:
} }
} }
/**
* @brief Determine how many threads the pool should have, based on the parameter passed to the constructor or reset().
*
* @param thread_count_ The parameter passed to the constructor or reset(). If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread.
* @return The number of threads to use for constructing the pool.
*/
[[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_)
{
if (thread_count_ > 0)
return thread_count_;
else
{
if (std::thread::hardware_concurrency() > 0)
return std::thread::hardware_concurrency();
else
return 1;
}
}
/** /**
* @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by push_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait_for_tasks() in case it is waiting. * @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by push_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait_for_tasks() in case it is waiting.
*/ */
@ -340,13 +571,14 @@ private:
{ {
std::function<void()> task; std::function<void()> task;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex); std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_available_cv.wait(tasks_lock, [&] { return !tasks.empty() || !running; }); task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !running; });
if (running && !paused) if (running && !paused)
{ {
task = std::move(tasks.front()); task = std::move(tasks.front());
tasks.pop(); tasks.pop();
tasks_lock.unlock(); tasks_lock.unlock();
task(); task();
tasks_lock.lock();
--tasks_total; --tasks_total;
if (waiting) if (waiting)
task_done_cv.notify_one(); task_done_cv.notify_one();
@ -358,6 +590,11 @@ private:
// Private data // Private data
// ============ // ============
/**
* @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to false again, the workers resume retrieving tasks.
*/
std::atomic<bool> paused = false;
/** /**
* @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working. * @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working.
*/ */
@ -413,7 +650,7 @@ private:
/** /**
* @brief A helper class to synchronize printing to an output stream by different threads. * @brief A helper class to synchronize printing to an output stream by different threads.
*/ */
class synced_stream class [[nodiscard]] synced_stream
{ {
public: public:
/** /**
@ -421,7 +658,7 @@ public:
* *
* @param out_stream_ The output stream to print to. The default value is std::cout. * @param out_stream_ The output stream to print to. The default value is std::cout.
*/ */
explicit synced_stream(std::ostream& out_stream_ = std::cout) : out_stream(out_stream_) {}; synced_stream(std::ostream& out_stream_ = std::cout) : out_stream(out_stream_) {}
/** /**
* @brief Print any number of items into the output stream. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print. * @brief Print any number of items into the output stream. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print.
@ -430,10 +667,10 @@ public:
* @param items The items to print. * @param items The items to print.
*/ */
template <typename... T> template <typename... T>
void print(const T&... items) void print(T&&... items)
{ {
const std::scoped_lock lock(stream_mutex); const std::scoped_lock lock(stream_mutex);
(out_stream << ... << items); (out_stream << ... << std::forward<T>(items));
} }
/** /**
@ -443,11 +680,21 @@ public:
* @param items The items to print. * @param items The items to print.
*/ */
template <typename... T> template <typename... T>
void println(const T&... items) void println(T&&... items)
{ {
print(items..., '\n'); print(std::forward<T>(items)..., '\n');
} }
/**
* @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::endl). Prints a newline character to the stream, and then flushes it. Should only be used if flushing is desired, otherwise '\n' should be used instead.
*/
inline static std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
/**
* @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::flush). Used to flush the stream.
*/
inline static std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);
private: private:
/** /**
* @brief The output stream to print to. * @brief The output stream to print to.
@ -469,7 +716,7 @@ private:
/** /**
* @brief A helper class to measure execution time for benchmarking purposes. * @brief A helper class to measure execution time for benchmarking purposes.
*/ */
class timer class [[nodiscard]] timer
{ {
public: public:
/** /**
@ -493,7 +740,7 @@ public:
* *
* @return The number of milliseconds. * @return The number of milliseconds.
*/ */
std::chrono::milliseconds::rep ms() const [[nodiscard]] std::chrono::milliseconds::rep ms() const
{ {
return (std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_time)).count(); return (std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_time)).count();
} }